Compare commits
No commits in common. "cef74151d279c43163ebddb59e46ddd07e9b8302" and "1b3ca1af4b570921bc140988dde27cf938d44f68" have entirely different histories.
cef74151d2
...
1b3ca1af4b
28
Makefile
28
Makefile
@ -1,22 +1,22 @@
|
|||||||
|
|
||||||
CONFIG = ./config.nix
|
CONFIG = ./config.nix
|
||||||
BASH = $$(nix-build --no-out-link -A pkgs.bash)/bin/bash
|
|
||||||
|
|
||||||
entrypoint:
|
entrypoint:
|
||||||
nix-build -A entrypoint --arg config '(import ${CONFIG})'
|
nix-build -A entrypoint \
|
||||||
|
--arg baseConfig '(import ${CONFIG})'
|
||||||
|
|
||||||
install-systemd:
|
install:
|
||||||
$$(nix-build --no-out-link -A install --arg config '(import ${CONFIG})')
|
$$(nix-build -A install --arg baseConfig '(import ${CONFIG})')
|
||||||
|
|
||||||
test:
|
test:
|
||||||
${BASH} tmp-dev-env.sh \
|
$$(nix-build --no-out-link -A pkgs.bash)/bin/bash srv-dev-env.sh \
|
||||||
--run "cd src; go test ./... -count=1 -tags integration"
|
--run "cd srv/src && go test ./... -count=1 -tags integration"
|
||||||
|
@echo "\nTESTS PASSED!\n"
|
||||||
|
|
||||||
shell:
|
srv.dev-shell:
|
||||||
${BASH} tmp-dev-env.sh \
|
$$(nix-build --no-out-link -A pkgs.bash)/bin/bash srv-dev-env.sh \
|
||||||
--command " \
|
--command "cd srv/src; return"
|
||||||
cd src; \
|
|
||||||
echo 'Loading test data...'; \
|
srv.shell:
|
||||||
(cd cmd/load-test-data && go run main.go); \
|
nix-shell -A srv.shellWithBuild --arg baseConfig '(import ${CONFIG})' \
|
||||||
return; \
|
--command 'cd srv/src; return'
|
||||||
"
|
|
||||||
|
89
default.nix
89
default.nix
@ -1,69 +1,78 @@
|
|||||||
{
|
{
|
||||||
|
|
||||||
pkgs ? import (fetchTarball {
|
pkgsArg ? import (fetchTarball {
|
||||||
name = "nixpkgs-21-05";
|
name = "nixpkgs-21-05";
|
||||||
url = "https://github.com/NixOS/nixpkgs/archive/7e9b0dff974c89e070da1ad85713ff3c20b0ca97.tar.gz";
|
url = "https://github.com/NixOS/nixpkgs/archive/7e9b0dff974c89e070da1ad85713ff3c20b0ca97.tar.gz";
|
||||||
sha256 = "1ckzhh24mgz6jd1xhfgx0i9mijk6xjqxwsshnvq789xsavrmsc36";
|
sha256 = "1ckzhh24mgz6jd1xhfgx0i9mijk6xjqxwsshnvq789xsavrmsc36";
|
||||||
}) {},
|
}) {},
|
||||||
|
|
||||||
config ? import ./config.nix,
|
baseConfig ? import ./config.nix,
|
||||||
|
skipServices ? [],
|
||||||
|
|
||||||
}: rec {
|
}: rec {
|
||||||
|
|
||||||
inherit pkgs;
|
pkgs = pkgsArg;
|
||||||
|
|
||||||
init = pkgs.writeText "mediocre-blog-init" ''
|
config = baseConfig // {
|
||||||
|
redisListenPath = "${config.runDir}/redis";
|
||||||
|
};
|
||||||
|
|
||||||
export MEDIOCRE_BLOG_DATA_DIR="${config.dataDir}"
|
srv = pkgs.callPackage (import ./srv) {
|
||||||
|
inherit config;
|
||||||
|
};
|
||||||
|
|
||||||
# mailing list
|
redisCfg = pkgs.writeText "mediocre-blog-redisCfg" ''
|
||||||
export MEDIOCRE_BLOG_ML_SMTP_ADDR="${config.mlSMTPAddr}"
|
port 0
|
||||||
export MEDIOCRE_BLOG_ML_SMTP_AUTH="${config.mlSMTPAuth}"
|
unixsocket ${config.redisListenPath}
|
||||||
export MEDIOCRE_BLOG_ML_PUBLIC_URL="${config.publicURL}"
|
daemonize no
|
||||||
|
loglevel notice
|
||||||
# pow
|
logfile ""
|
||||||
export MEDIOCRE_BLOG_POW_SECRET="${config.powSecret}"
|
appendonly yes
|
||||||
|
appendfilename "appendonly.aof"
|
||||||
# http
|
dir ${config.dataDir}/redis
|
||||||
export MEDIOCRE_BLOG_HTTP_PUBLIC_URL="${config.publicURL}"
|
|
||||||
export MEDIOCRE_BLOG_HTTP_LISTEN_PROTO="${config.httpListenProto}"
|
|
||||||
export MEDIOCRE_BLOG_HTTP_LISTEN_ADDR="${config.httpListenAddr}"
|
|
||||||
export MEDIOCRE_BLOG_HTTP_AUTH_USERS='${builtins.toJSON config.httpAuthUsers}'
|
|
||||||
export MEDIOCRE_BLOG_HTTP_AUTH_RATELIMIT='${config.httpAuthRatelimit}'
|
|
||||||
'';
|
'';
|
||||||
|
|
||||||
bin = pkgs.buildGoModule {
|
redisBin = pkgs.writeScript "mediocre-blog-redisBin" ''
|
||||||
pname = "mediocre-blog";
|
#!/bin/sh
|
||||||
version = "dev";
|
mkdir -p ${config.dataDir}/redis
|
||||||
src = ./src;
|
exec ${pkgs.redis}/bin/redis-server ${redisCfg}
|
||||||
vendorSha256 = "sha256:1vazrrg8rs9n8x40c9r53h9qnyxw59xkp0aq7jl15fliigk6q0cr";
|
'';
|
||||||
|
|
||||||
subPackages = [ "cmd/mediocre-blog" ];
|
srvCircusCfg = ''
|
||||||
|
[watcher:srv]
|
||||||
|
cmd = ${srv.bin}
|
||||||
|
numprocesses = 1
|
||||||
|
'';
|
||||||
|
|
||||||
# disable tests
|
redisCircusCfg = ''
|
||||||
checkPhase = '''';
|
[watcher:redis]
|
||||||
};
|
cmd = ${redisBin}
|
||||||
|
numprocesses = 1
|
||||||
|
'';
|
||||||
|
|
||||||
|
circusCfg = pkgs.writeText "mediocre-blog-circusCfg" ''
|
||||||
|
[circus]
|
||||||
|
endpoint = tcp://127.0.0.1:0
|
||||||
|
pubsub_endpoint = tcp://127.0.0.1:0
|
||||||
|
|
||||||
|
${if (!builtins.elem "srv" skipServices) then srvCircusCfg else ""}
|
||||||
|
|
||||||
|
${if (!builtins.elem "redis" skipServices) then redisCircusCfg else ""}
|
||||||
|
'';
|
||||||
|
|
||||||
entrypoint = pkgs.writeScript "mediocre-blog-entrypoint" ''
|
entrypoint = pkgs.writeScript "mediocre-blog-entrypoint" ''
|
||||||
#!${pkgs.bash}/bin/bash
|
#!/bin/sh
|
||||||
|
|
||||||
set -e
|
set -e
|
||||||
source ${init}
|
|
||||||
|
|
||||||
|
if [ ! -d ${config.runDir} ]; then
|
||||||
mkdir -p ${config.runDir}
|
mkdir -p ${config.runDir}
|
||||||
|
fi
|
||||||
|
|
||||||
mkdir -p ${config.dataDir}
|
mkdir -p ${config.dataDir}
|
||||||
|
exec ${pkgs.circus}/bin/circusd ${circusCfg}
|
||||||
exec ${bin}/bin/mediocre-blog "$@"
|
|
||||||
'';
|
'';
|
||||||
|
|
||||||
shell = pkgs.stdenv.mkDerivation {
|
|
||||||
name = "mediocre-blog-shell";
|
|
||||||
buildInputs = [ pkgs.go pkgs.sqlite ];
|
|
||||||
shellHook = ''
|
|
||||||
source ${init}
|
|
||||||
'';
|
|
||||||
};
|
|
||||||
|
|
||||||
service = pkgs.writeText "mediocre-blog" ''
|
service = pkgs.writeText "mediocre-blog" ''
|
||||||
[Unit]
|
[Unit]
|
||||||
Description=mediocregopher mediocre blog
|
Description=mediocregopher mediocre blog
|
||||||
|
29
srv-dev-env.sh
Normal file
29
srv-dev-env.sh
Normal file
@ -0,0 +1,29 @@
|
|||||||
|
|
||||||
|
test_dir="$(mktemp -d)"
|
||||||
|
|
||||||
|
mkdir -p "$test_dir"/run
|
||||||
|
mkdir -p "$test_dir"/data
|
||||||
|
|
||||||
|
test_cfg="(import ./config.nix) // {
|
||||||
|
runDir=\"${test_dir}/run\";
|
||||||
|
dataDir=\"${test_dir}/data\";
|
||||||
|
}"
|
||||||
|
|
||||||
|
entrypoint=$(nix-build --no-out-link -A entrypoint \
|
||||||
|
--arg baseConfig "$test_cfg" \
|
||||||
|
--arg skipServices '["srv"]')
|
||||||
|
|
||||||
|
$entrypoint &
|
||||||
|
trap "kill $!; wait; rm -rf $test_dir" EXIT
|
||||||
|
|
||||||
|
# NOTE this is a bit of a hack... the location of the redis socket's source of
|
||||||
|
# truth is in default.nix, but it's not clear how to get that from there to
|
||||||
|
# here, so we reproduce the calculation here.
|
||||||
|
while [ ! -e $test_dir/run/redis ]; do
|
||||||
|
echo "waiting for redis unix socket"
|
||||||
|
sleep 1
|
||||||
|
done
|
||||||
|
|
||||||
|
nix-shell -A srv.shell \
|
||||||
|
--arg baseConfig "$test_cfg" \
|
||||||
|
"$@"
|
63
srv/default.nix
Normal file
63
srv/default.nix
Normal file
@ -0,0 +1,63 @@
|
|||||||
|
{
|
||||||
|
bash,
|
||||||
|
go,
|
||||||
|
buildGoModule,
|
||||||
|
sqlite,
|
||||||
|
writeScript,
|
||||||
|
writeText,
|
||||||
|
stdenv,
|
||||||
|
|
||||||
|
config,
|
||||||
|
}: rec {
|
||||||
|
|
||||||
|
init = writeText "mediocre-blog-srv-init" ''
|
||||||
|
|
||||||
|
export MEDIOCRE_BLOG_DATA_DIR="${config.dataDir}"
|
||||||
|
|
||||||
|
# mailing list
|
||||||
|
export MEDIOCRE_BLOG_ML_SMTP_ADDR="${config.mlSMTPAddr}"
|
||||||
|
export MEDIOCRE_BLOG_ML_SMTP_AUTH="${config.mlSMTPAuth}"
|
||||||
|
export MEDIOCRE_BLOG_ML_PUBLIC_URL="${config.publicURL}"
|
||||||
|
|
||||||
|
# redis
|
||||||
|
export MEDIOCRE_BLOG_REDIS_PROTO=unix
|
||||||
|
export MEDIOCRE_BLOG_REDIS_ADDR="${config.redisListenPath}"
|
||||||
|
|
||||||
|
# pow
|
||||||
|
export MEDIOCRE_BLOG_POW_SECRET="${config.powSecret}"
|
||||||
|
|
||||||
|
# http
|
||||||
|
export MEDIOCRE_BLOG_HTTP_PUBLIC_URL="${config.publicURL}"
|
||||||
|
export MEDIOCRE_BLOG_HTTP_LISTEN_PROTO="${config.httpListenProto}"
|
||||||
|
export MEDIOCRE_BLOG_HTTP_LISTEN_ADDR="${config.httpListenAddr}"
|
||||||
|
export MEDIOCRE_BLOG_HTTP_AUTH_USERS='${builtins.toJSON config.httpAuthUsers}'
|
||||||
|
export MEDIOCRE_BLOG_HTTP_AUTH_RATELIMIT='${config.httpAuthRatelimit}'
|
||||||
|
'';
|
||||||
|
|
||||||
|
build = buildGoModule {
|
||||||
|
pname = "mediocre-blog-srv";
|
||||||
|
version = "dev";
|
||||||
|
src = ./src;
|
||||||
|
vendorSha256 = "sha256-C3hyPDO+6oTUeoGP/ZzBn5Y4V/q1jI12BwkR9NADHn0=";
|
||||||
|
|
||||||
|
# disable tests
|
||||||
|
checkPhase = '''';
|
||||||
|
};
|
||||||
|
|
||||||
|
bin = writeScript "mediocre-blog-srv-bin" ''
|
||||||
|
#!${bash}/bin/bash
|
||||||
|
source ${init}
|
||||||
|
exec ${build}/bin/mediocre-blog
|
||||||
|
'';
|
||||||
|
|
||||||
|
shell = stdenv.mkDerivation {
|
||||||
|
name = "mediocre-blog-srv-shell";
|
||||||
|
buildInputs = [ go sqlite ];
|
||||||
|
shellHook = ''
|
||||||
|
source ${init}
|
||||||
|
|
||||||
|
echo "Loading test data..."
|
||||||
|
(cd srv/src/cmd/load-test-data && go run main.go)
|
||||||
|
'';
|
||||||
|
};
|
||||||
|
}
|
55
srv/src/cfg/radix_client.go
Normal file
55
srv/src/cfg/radix_client.go
Normal file
@ -0,0 +1,55 @@
|
|||||||
|
package cfg
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/mediocregopher/mediocre-go-lib/v2/mctx"
|
||||||
|
"github.com/mediocregopher/radix/v4"
|
||||||
|
)
|
||||||
|
|
||||||
|
// RadixClient is a single redis client which can be configured.
|
||||||
|
type RadixClient struct {
|
||||||
|
radix.Client
|
||||||
|
|
||||||
|
proto, addr string
|
||||||
|
poolSize int
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetupCfg implement the cfg.Cfger interface.
|
||||||
|
func (c *RadixClient) SetupCfg(cfg *Cfg) {
|
||||||
|
|
||||||
|
cfg.StringVar(&c.proto, "redis-proto", "tcp", "Network protocol to connect to redis over, can be tcp or unix")
|
||||||
|
cfg.StringVar(&c.addr, "redis-addr", "127.0.0.1:6379", "Address redis is expected to listen on")
|
||||||
|
cfg.IntVar(&c.poolSize, "redis-pool-size", 5, "Number of connections in the redis pool to keep")
|
||||||
|
|
||||||
|
cfg.OnInit(func(ctx context.Context) error {
|
||||||
|
client, err := (radix.PoolConfig{
|
||||||
|
Size: c.poolSize,
|
||||||
|
}).New(
|
||||||
|
ctx, c.proto, c.addr,
|
||||||
|
)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf(
|
||||||
|
"initializing redis pool of size %d at %s://%s: %w",
|
||||||
|
c.poolSize, c.proto, c.addr, err,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
c.Client = client
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Annotate implements mctx.Annotator interface.
|
||||||
|
func (c *RadixClient) Annotate(a mctx.Annotations) {
|
||||||
|
a["redisProto"] = c.proto
|
||||||
|
a["redisAddr"] = c.addr
|
||||||
|
a["redisPoolSize"] = c.poolSize
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close cleans up the radix client.
|
||||||
|
func (c *RadixClient) Close() error {
|
||||||
|
return c.Client.Close()
|
||||||
|
}
|
467
srv/src/chat/chat.go
Normal file
467
srv/src/chat/chat.go
Normal file
@ -0,0 +1,467 @@
|
|||||||
|
// Package chat implements a simple chatroom system.
|
||||||
|
package chat
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"strconv"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/mediocregopher/mediocre-go-lib/v2/mctx"
|
||||||
|
"github.com/mediocregopher/mediocre-go-lib/v2/mlog"
|
||||||
|
"github.com/mediocregopher/radix/v4"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ErrInvalidArg is returned from methods in this package when a call fails due
|
||||||
|
// to invalid input.
|
||||||
|
type ErrInvalidArg struct {
|
||||||
|
Err error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e ErrInvalidArg) Error() string {
|
||||||
|
return fmt.Sprintf("invalid argument: %v", e.Err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
errInvalidMessageID = ErrInvalidArg{Err: errors.New("invalid Message ID")}
|
||||||
|
)
|
||||||
|
|
||||||
|
// Message describes a message which has been posted to a Room.
|
||||||
|
type Message struct {
|
||||||
|
ID string `json:"id"`
|
||||||
|
UserID UserID `json:"userID"`
|
||||||
|
Body string `json:"body"`
|
||||||
|
CreatedAt int64 `json:"createdAt,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func msgFromStreamEntry(entry radix.StreamEntry) (Message, error) {
|
||||||
|
|
||||||
|
// NOTE this should probably be a shortcut in radix
|
||||||
|
var bodyStr string
|
||||||
|
for _, field := range entry.Fields {
|
||||||
|
if field[0] == "json" {
|
||||||
|
bodyStr = field[1]
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if bodyStr == "" {
|
||||||
|
return Message{}, errors.New("no 'json' field")
|
||||||
|
}
|
||||||
|
|
||||||
|
var msg Message
|
||||||
|
if err := json.Unmarshal([]byte(bodyStr), &msg); err != nil {
|
||||||
|
return Message{}, fmt.Errorf(
|
||||||
|
"json unmarshaling body %q: %w", bodyStr, err,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
msg.ID = entry.ID.String()
|
||||||
|
msg.CreatedAt = int64(entry.ID.Time / 1000)
|
||||||
|
return msg, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// MessageIterator returns a sequence of Messages which may or may not be
|
||||||
|
// unbounded.
|
||||||
|
type MessageIterator interface {
|
||||||
|
|
||||||
|
// Next blocks until it returns the next Message in the sequence, or the
|
||||||
|
// context error if the context is cancelled, or io.EOF if the sequence has
|
||||||
|
// been exhausted.
|
||||||
|
Next(context.Context) (Message, error)
|
||||||
|
|
||||||
|
// Close should always be called once Next has returned an error or the
|
||||||
|
// MessageIterator will no longer be used.
|
||||||
|
Close() error
|
||||||
|
}
|
||||||
|
|
||||||
|
// HistoryOpts are passed into Room's History method in order to affect its
|
||||||
|
// result. All fields are optional.
|
||||||
|
type HistoryOpts struct {
|
||||||
|
Limit int // defaults to, and is capped at, 100.
|
||||||
|
Cursor string // If not given then the most recent Messages are returned.
|
||||||
|
}
|
||||||
|
|
||||||
|
func (o HistoryOpts) sanitize() (HistoryOpts, error) {
|
||||||
|
if o.Limit <= 0 || o.Limit > 100 {
|
||||||
|
o.Limit = 100
|
||||||
|
}
|
||||||
|
|
||||||
|
if o.Cursor != "" {
|
||||||
|
id, err := parseStreamEntryID(o.Cursor)
|
||||||
|
if err != nil {
|
||||||
|
return HistoryOpts{}, fmt.Errorf("parsing Cursor: %w", err)
|
||||||
|
}
|
||||||
|
o.Cursor = id.String()
|
||||||
|
}
|
||||||
|
|
||||||
|
return o, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Room implements functionality related to a single, unique chat room.
|
||||||
|
type Room interface {
|
||||||
|
|
||||||
|
// Append accepts a new Message and stores it at the end of the room's
|
||||||
|
// history. The original Message is returned with any relevant fields (e.g.
|
||||||
|
// ID) updated.
|
||||||
|
Append(context.Context, Message) (Message, error)
|
||||||
|
|
||||||
|
// Returns a cursor and the list of historical Messages in time descending
|
||||||
|
// order. The cursor can be passed into the next call to History to receive
|
||||||
|
// the next set of Messages.
|
||||||
|
History(context.Context, HistoryOpts) (string, []Message, error)
|
||||||
|
|
||||||
|
// Listen returns a MessageIterator which will return all Messages appended
|
||||||
|
// to the Room since the given ID. Once all existing messages are iterated
|
||||||
|
// through then the MessageIterator will begin blocking until a new Message
|
||||||
|
// is posted.
|
||||||
|
Listen(ctx context.Context, sinceID string) (MessageIterator, error)
|
||||||
|
|
||||||
|
// Delete deletes a Message from the Room.
|
||||||
|
Delete(ctx context.Context, id string) error
|
||||||
|
|
||||||
|
// Close is used to clean up all resources created by the Room.
|
||||||
|
Close() error
|
||||||
|
}
|
||||||
|
|
||||||
|
// RoomParams are used to instantiate a new Room. All fields are required unless
|
||||||
|
// otherwise noted.
|
||||||
|
type RoomParams struct {
|
||||||
|
Logger *mlog.Logger
|
||||||
|
Redis radix.Client
|
||||||
|
ID string
|
||||||
|
MaxMessages int
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p RoomParams) streamKey() string {
|
||||||
|
return fmt.Sprintf("chat:{%s}:stream", p.ID)
|
||||||
|
}
|
||||||
|
|
||||||
|
type room struct {
|
||||||
|
params RoomParams
|
||||||
|
|
||||||
|
closeCtx context.Context
|
||||||
|
closeCancel context.CancelFunc
|
||||||
|
wg sync.WaitGroup
|
||||||
|
|
||||||
|
listeningL sync.Mutex
|
||||||
|
listening map[chan Message]struct{}
|
||||||
|
listeningLastID radix.StreamEntryID
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewRoom initializes and returns a new Room instance.
|
||||||
|
func NewRoom(ctx context.Context, params RoomParams) (Room, error) {
|
||||||
|
|
||||||
|
params.Logger = params.Logger.WithNamespace("chat-room")
|
||||||
|
|
||||||
|
r := &room{
|
||||||
|
params: params,
|
||||||
|
listening: map[chan Message]struct{}{},
|
||||||
|
}
|
||||||
|
|
||||||
|
r.closeCtx, r.closeCancel = context.WithCancel(context.Background())
|
||||||
|
|
||||||
|
// figure out the most recent message, if any.
|
||||||
|
lastEntryID, err := r.mostRecentMsgID(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("discovering most recent entry ID in stream: %w", err)
|
||||||
|
}
|
||||||
|
r.listeningLastID = lastEntryID
|
||||||
|
|
||||||
|
r.wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer r.wg.Done()
|
||||||
|
r.readStreamLoop(r.closeCtx)
|
||||||
|
}()
|
||||||
|
|
||||||
|
return r, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *room) Close() error {
|
||||||
|
r.closeCancel()
|
||||||
|
r.wg.Wait()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *room) mostRecentMsgID(ctx context.Context) (radix.StreamEntryID, error) {
|
||||||
|
|
||||||
|
var entries []radix.StreamEntry
|
||||||
|
err := r.params.Redis.Do(ctx, radix.Cmd(
|
||||||
|
&entries,
|
||||||
|
"XREVRANGE", r.params.streamKey(), "+", "-", "COUNT", "1",
|
||||||
|
))
|
||||||
|
|
||||||
|
if err != nil || len(entries) == 0 {
|
||||||
|
return radix.StreamEntryID{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return entries[0].ID, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *room) Append(ctx context.Context, msg Message) (Message, error) {
|
||||||
|
msg.ID = "" // just in case
|
||||||
|
|
||||||
|
b, err := json.Marshal(msg)
|
||||||
|
if err != nil {
|
||||||
|
return Message{}, fmt.Errorf("json marshaling Message: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
key := r.params.streamKey()
|
||||||
|
maxLen := strconv.Itoa(r.params.MaxMessages)
|
||||||
|
body := string(b)
|
||||||
|
|
||||||
|
var id radix.StreamEntryID
|
||||||
|
|
||||||
|
err = r.params.Redis.Do(ctx, radix.Cmd(
|
||||||
|
&id, "XADD", key, "MAXLEN", "=", maxLen, "*", "json", body,
|
||||||
|
))
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return Message{}, fmt.Errorf("posting message to redis: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
msg.ID = id.String()
|
||||||
|
msg.CreatedAt = int64(id.Time / 1000)
|
||||||
|
return msg, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
const zeroCursor = "0-0"
|
||||||
|
|
||||||
|
func (r *room) History(ctx context.Context, opts HistoryOpts) (string, []Message, error) {
|
||||||
|
opts, err := opts.sanitize()
|
||||||
|
if err != nil {
|
||||||
|
return "", nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
key := r.params.streamKey()
|
||||||
|
end := opts.Cursor
|
||||||
|
if end == "" {
|
||||||
|
end = "+"
|
||||||
|
}
|
||||||
|
start := "-"
|
||||||
|
count := strconv.Itoa(opts.Limit)
|
||||||
|
|
||||||
|
msgs := make([]Message, 0, opts.Limit)
|
||||||
|
streamEntries := make([]radix.StreamEntry, 0, opts.Limit)
|
||||||
|
|
||||||
|
err = r.params.Redis.Do(ctx, radix.Cmd(
|
||||||
|
&streamEntries,
|
||||||
|
"XREVRANGE", key, end, start, "COUNT", count,
|
||||||
|
))
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return "", nil, fmt.Errorf("calling XREVRANGE: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var oldestEntryID radix.StreamEntryID
|
||||||
|
|
||||||
|
for _, entry := range streamEntries {
|
||||||
|
oldestEntryID = entry.ID
|
||||||
|
|
||||||
|
msg, err := msgFromStreamEntry(entry)
|
||||||
|
if err != nil {
|
||||||
|
return "", nil, fmt.Errorf(
|
||||||
|
"parsing stream entry %q: %w", entry.ID, err,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
msgs = append(msgs, msg)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(msgs) < opts.Limit {
|
||||||
|
return zeroCursor, msgs, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
cursor := oldestEntryID.Prev()
|
||||||
|
return cursor.String(), msgs, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *room) readStream(ctx context.Context) error {
|
||||||
|
|
||||||
|
r.listeningL.Lock()
|
||||||
|
lastEntryID := r.listeningLastID
|
||||||
|
r.listeningL.Unlock()
|
||||||
|
|
||||||
|
redisAddr := r.params.Redis.Addr()
|
||||||
|
redisConn, err := radix.Dial(ctx, redisAddr.Network(), redisAddr.String())
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("creating redis connection: %w", err)
|
||||||
|
}
|
||||||
|
defer redisConn.Close()
|
||||||
|
|
||||||
|
streamReader := (radix.StreamReaderConfig{}).New(
|
||||||
|
redisConn,
|
||||||
|
map[string]radix.StreamConfig{
|
||||||
|
r.params.streamKey(): {After: lastEntryID},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
for {
|
||||||
|
dlCtx, dlCtxCancel := context.WithTimeout(ctx, 10*time.Second)
|
||||||
|
_, streamEntry, err := streamReader.Next(dlCtx)
|
||||||
|
dlCtxCancel()
|
||||||
|
|
||||||
|
if errors.Is(err, radix.ErrNoStreamEntries) {
|
||||||
|
continue
|
||||||
|
} else if err != nil {
|
||||||
|
return fmt.Errorf("fetching next entry from stream: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
msg, err := msgFromStreamEntry(streamEntry)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("parsing stream entry %q: %w", streamEntry, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
r.listeningL.Lock()
|
||||||
|
|
||||||
|
var dropped int
|
||||||
|
for ch := range r.listening {
|
||||||
|
select {
|
||||||
|
case ch <- msg:
|
||||||
|
default:
|
||||||
|
dropped++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if dropped > 0 {
|
||||||
|
ctx := mctx.Annotate(ctx, "msgID", msg.ID, "dropped", dropped)
|
||||||
|
r.params.Logger.WarnString(ctx, "some listening channels full, messages dropped")
|
||||||
|
}
|
||||||
|
|
||||||
|
r.listeningLastID = streamEntry.ID
|
||||||
|
|
||||||
|
r.listeningL.Unlock()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *room) readStreamLoop(ctx context.Context) {
|
||||||
|
for {
|
||||||
|
err := r.readStream(ctx)
|
||||||
|
if errors.Is(err, context.Canceled) {
|
||||||
|
return
|
||||||
|
} else if err != nil {
|
||||||
|
r.params.Logger.Error(ctx, "reading from redis stream", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type listenMsgIterator struct {
|
||||||
|
ch <-chan Message
|
||||||
|
missedMsgs []Message
|
||||||
|
sinceEntryID radix.StreamEntryID
|
||||||
|
cleanup func()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *listenMsgIterator) Next(ctx context.Context) (Message, error) {
|
||||||
|
|
||||||
|
if len(i.missedMsgs) > 0 {
|
||||||
|
msg := i.missedMsgs[0]
|
||||||
|
i.missedMsgs = i.missedMsgs[1:]
|
||||||
|
return msg, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return Message{}, ctx.Err()
|
||||||
|
case msg := <-i.ch:
|
||||||
|
|
||||||
|
entryID, err := parseStreamEntryID(msg.ID)
|
||||||
|
if err != nil {
|
||||||
|
return Message{}, fmt.Errorf("parsing Message ID %q: %w", msg.ID, err)
|
||||||
|
|
||||||
|
} else if !i.sinceEntryID.Before(entryID) {
|
||||||
|
// this can happen if someone Appends a Message at the same time
|
||||||
|
// as another calls Listen. The Listener might have already seen
|
||||||
|
// the Message by calling History prior to the stream reader
|
||||||
|
// having processed it and updating listeningLastID.
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
return msg, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *listenMsgIterator) Close() error {
|
||||||
|
i.cleanup()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *room) Listen(
|
||||||
|
ctx context.Context, sinceID string,
|
||||||
|
) (
|
||||||
|
MessageIterator, error,
|
||||||
|
) {
|
||||||
|
|
||||||
|
var sinceEntryID radix.StreamEntryID
|
||||||
|
|
||||||
|
if sinceID != "" {
|
||||||
|
var err error
|
||||||
|
if sinceEntryID, err = parseStreamEntryID(sinceID); err != nil {
|
||||||
|
return nil, fmt.Errorf("parsing sinceID: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ch := make(chan Message, 32)
|
||||||
|
|
||||||
|
r.listeningL.Lock()
|
||||||
|
lastEntryID := r.listeningLastID
|
||||||
|
r.listening[ch] = struct{}{}
|
||||||
|
r.listeningL.Unlock()
|
||||||
|
|
||||||
|
cleanup := func() {
|
||||||
|
r.listeningL.Lock()
|
||||||
|
defer r.listeningL.Unlock()
|
||||||
|
delete(r.listening, ch)
|
||||||
|
}
|
||||||
|
|
||||||
|
key := r.params.streamKey()
|
||||||
|
start := sinceEntryID.Next().String()
|
||||||
|
end := "+"
|
||||||
|
if lastEntryID != (radix.StreamEntryID{}) {
|
||||||
|
end = lastEntryID.String()
|
||||||
|
}
|
||||||
|
|
||||||
|
var streamEntries []radix.StreamEntry
|
||||||
|
|
||||||
|
err := r.params.Redis.Do(ctx, radix.Cmd(
|
||||||
|
&streamEntries,
|
||||||
|
"XRANGE", key, start, end,
|
||||||
|
))
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
cleanup()
|
||||||
|
return nil, fmt.Errorf("retrieving missed stream entries: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
missedMsgs := make([]Message, len(streamEntries))
|
||||||
|
|
||||||
|
for i := range streamEntries {
|
||||||
|
|
||||||
|
msg, err := msgFromStreamEntry(streamEntries[i])
|
||||||
|
if err != nil {
|
||||||
|
cleanup()
|
||||||
|
return nil, fmt.Errorf(
|
||||||
|
"parsing stream entry %q: %w", streamEntries[i].ID, err,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
missedMsgs[i] = msg
|
||||||
|
}
|
||||||
|
|
||||||
|
return &listenMsgIterator{
|
||||||
|
ch: ch,
|
||||||
|
missedMsgs: missedMsgs,
|
||||||
|
sinceEntryID: sinceEntryID,
|
||||||
|
cleanup: cleanup,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *room) Delete(ctx context.Context, id string) error {
|
||||||
|
return r.params.Redis.Do(ctx, radix.Cmd(
|
||||||
|
nil, "XDEL", r.params.streamKey(), id,
|
||||||
|
))
|
||||||
|
}
|
213
srv/src/chat/chat_it_test.go
Normal file
213
srv/src/chat/chat_it_test.go
Normal file
@ -0,0 +1,213 @@
|
|||||||
|
//go:build integration
|
||||||
|
// +build integration
|
||||||
|
|
||||||
|
package chat
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"strconv"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/google/uuid"
|
||||||
|
cfgpkg "github.com/mediocregopher/blog.mediocregopher.com/srv/cfg"
|
||||||
|
"github.com/mediocregopher/mediocre-go-lib/v2/mlog"
|
||||||
|
"github.com/mediocregopher/radix/v4"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
)
|
||||||
|
|
||||||
|
const roomTestHarnessMaxMsgs = 10
|
||||||
|
|
||||||
|
type roomTestHarness struct {
|
||||||
|
ctx context.Context
|
||||||
|
room Room
|
||||||
|
allMsgs []Message
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *roomTestHarness) newMsg(t *testing.T) Message {
|
||||||
|
msg, err := h.room.Append(h.ctx, Message{
|
||||||
|
UserID: UserID{
|
||||||
|
Name: uuid.New().String(),
|
||||||
|
Hash: "0000",
|
||||||
|
},
|
||||||
|
Body: uuid.New().String(),
|
||||||
|
})
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
t.Logf("appended message %s", msg.ID)
|
||||||
|
|
||||||
|
h.allMsgs = append([]Message{msg}, h.allMsgs...)
|
||||||
|
|
||||||
|
if len(h.allMsgs) > roomTestHarnessMaxMsgs {
|
||||||
|
h.allMsgs = h.allMsgs[:roomTestHarnessMaxMsgs]
|
||||||
|
}
|
||||||
|
|
||||||
|
return msg
|
||||||
|
}
|
||||||
|
|
||||||
|
func newRoomTestHarness(t *testing.T) *roomTestHarness {
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
|
t.Cleanup(cancel)
|
||||||
|
|
||||||
|
cfg := cfgpkg.NewBlogCfg(cfgpkg.Params{
|
||||||
|
Args: []string{}, // prevents the test process args from interfering
|
||||||
|
})
|
||||||
|
|
||||||
|
var radixClient cfgpkg.RadixClient
|
||||||
|
radixClient.SetupCfg(cfg)
|
||||||
|
t.Cleanup(func() { radixClient.Close() })
|
||||||
|
|
||||||
|
if err := cfg.Init(ctx); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
roomParams := RoomParams{
|
||||||
|
Logger: mlog.NewLogger(nil),
|
||||||
|
Redis: radixClient.Client,
|
||||||
|
ID: uuid.New().String(),
|
||||||
|
MaxMessages: roomTestHarnessMaxMsgs,
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Logf("creating test Room %q", roomParams.ID)
|
||||||
|
room, err := NewRoom(ctx, roomParams)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
t.Cleanup(func() {
|
||||||
|
err := radixClient.Client.Do(context.Background(), radix.Cmd(
|
||||||
|
nil, "DEL", roomParams.streamKey(),
|
||||||
|
))
|
||||||
|
assert.NoError(t, err)
|
||||||
|
})
|
||||||
|
|
||||||
|
return &roomTestHarness{ctx: ctx, room: room}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRoom(t *testing.T) {
|
||||||
|
t.Run("history", func(t *testing.T) {
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
numMsgs int
|
||||||
|
limit int
|
||||||
|
}{
|
||||||
|
{numMsgs: 0, limit: 1},
|
||||||
|
{numMsgs: 1, limit: 1},
|
||||||
|
{numMsgs: 2, limit: 1},
|
||||||
|
{numMsgs: 2, limit: 10},
|
||||||
|
{numMsgs: 9, limit: 2},
|
||||||
|
{numMsgs: 9, limit: 3},
|
||||||
|
{numMsgs: 9, limit: 4},
|
||||||
|
{numMsgs: 15, limit: 3},
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, test := range tests {
|
||||||
|
t.Run(strconv.Itoa(i), func(t *testing.T) {
|
||||||
|
t.Logf("test: %+v", test)
|
||||||
|
|
||||||
|
h := newRoomTestHarness(t)
|
||||||
|
|
||||||
|
for j := 0; j < test.numMsgs; j++ {
|
||||||
|
h.newMsg(t)
|
||||||
|
}
|
||||||
|
|
||||||
|
var gotMsgs []Message
|
||||||
|
var cursor string
|
||||||
|
|
||||||
|
for {
|
||||||
|
|
||||||
|
var msgs []Message
|
||||||
|
var err error
|
||||||
|
cursor, msgs, err = h.room.History(h.ctx, HistoryOpts{
|
||||||
|
Cursor: cursor,
|
||||||
|
Limit: test.limit,
|
||||||
|
})
|
||||||
|
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.NotEmpty(t, cursor)
|
||||||
|
|
||||||
|
if len(msgs) == 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
gotMsgs = append(gotMsgs, msgs...)
|
||||||
|
}
|
||||||
|
|
||||||
|
assert.Equal(t, h.allMsgs, gotMsgs)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
assertNextMsg := func(
|
||||||
|
t *testing.T, expMsg Message,
|
||||||
|
ctx context.Context, it MessageIterator,
|
||||||
|
) {
|
||||||
|
t.Helper()
|
||||||
|
gotMsg, err := it.Next(ctx)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, expMsg, gotMsg)
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Run("listen/already_populated", func(t *testing.T) {
|
||||||
|
h := newRoomTestHarness(t)
|
||||||
|
|
||||||
|
msgA, msgB, msgC := h.newMsg(t), h.newMsg(t), h.newMsg(t)
|
||||||
|
_ = msgA
|
||||||
|
_ = msgB
|
||||||
|
|
||||||
|
itFoo, err := h.room.Listen(h.ctx, msgC.ID)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
defer itFoo.Close()
|
||||||
|
|
||||||
|
itBar, err := h.room.Listen(h.ctx, msgA.ID)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
defer itBar.Close()
|
||||||
|
|
||||||
|
msgD := h.newMsg(t)
|
||||||
|
|
||||||
|
// itBar should get msgB and msgC before anything else.
|
||||||
|
assertNextMsg(t, msgB, h.ctx, itBar)
|
||||||
|
assertNextMsg(t, msgC, h.ctx, itBar)
|
||||||
|
|
||||||
|
// now both iterators should give msgD
|
||||||
|
assertNextMsg(t, msgD, h.ctx, itFoo)
|
||||||
|
assertNextMsg(t, msgD, h.ctx, itBar)
|
||||||
|
|
||||||
|
// timeout should be honored
|
||||||
|
{
|
||||||
|
timeoutCtx, timeoutCancel := context.WithTimeout(h.ctx, 1*time.Second)
|
||||||
|
_, errFoo := itFoo.Next(timeoutCtx)
|
||||||
|
_, errBar := itBar.Next(timeoutCtx)
|
||||||
|
timeoutCancel()
|
||||||
|
|
||||||
|
assert.ErrorIs(t, errFoo, context.DeadlineExceeded)
|
||||||
|
assert.ErrorIs(t, errBar, context.DeadlineExceeded)
|
||||||
|
}
|
||||||
|
|
||||||
|
// new message should work
|
||||||
|
{
|
||||||
|
expMsg := h.newMsg(t)
|
||||||
|
|
||||||
|
timeoutCtx, timeoutCancel := context.WithTimeout(h.ctx, 1*time.Second)
|
||||||
|
gotFooMsg, errFoo := itFoo.Next(timeoutCtx)
|
||||||
|
gotBarMsg, errBar := itBar.Next(timeoutCtx)
|
||||||
|
timeoutCancel()
|
||||||
|
|
||||||
|
assert.Equal(t, expMsg, gotFooMsg)
|
||||||
|
assert.NoError(t, errFoo)
|
||||||
|
assert.Equal(t, expMsg, gotBarMsg)
|
||||||
|
assert.NoError(t, errBar)
|
||||||
|
}
|
||||||
|
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("listen/empty", func(t *testing.T) {
|
||||||
|
h := newRoomTestHarness(t)
|
||||||
|
|
||||||
|
it, err := h.room.Listen(h.ctx, "")
|
||||||
|
assert.NoError(t, err)
|
||||||
|
defer it.Close()
|
||||||
|
|
||||||
|
msg := h.newMsg(t)
|
||||||
|
assertNextMsg(t, msg, h.ctx, it)
|
||||||
|
})
|
||||||
|
}
|
68
srv/src/chat/user.go
Normal file
68
srv/src/chat/user.go
Normal file
@ -0,0 +1,68 @@
|
|||||||
|
package chat
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/hex"
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"golang.org/x/crypto/argon2"
|
||||||
|
)
|
||||||
|
|
||||||
|
// UserID uniquely identifies an individual user who has posted a message in a
|
||||||
|
// Room.
|
||||||
|
type UserID struct {
|
||||||
|
|
||||||
|
// Name will be the user's chosen display name.
|
||||||
|
Name string `json:"name"`
|
||||||
|
|
||||||
|
// Hash will be a hex string generated from a secret only the user knows.
|
||||||
|
Hash string `json:"id"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// UserIDCalculator is used to calculate UserIDs.
|
||||||
|
type UserIDCalculator struct {
|
||||||
|
|
||||||
|
// Secret is used when calculating UserID Hash salts.
|
||||||
|
Secret []byte
|
||||||
|
|
||||||
|
// TimeCost, MemoryCost, and Threads are used as inputs to the Argon2id
|
||||||
|
// algorithm which is used to generate the Hash.
|
||||||
|
TimeCost, MemoryCost uint32
|
||||||
|
Threads uint8
|
||||||
|
|
||||||
|
// HashLen specifies the number of bytes the Hash should be.
|
||||||
|
HashLen uint32
|
||||||
|
|
||||||
|
// Lock, if set, forces concurrent Calculate calls to occur sequentially.
|
||||||
|
Lock *sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewUserIDCalculator returns a UserIDCalculator with sane defaults.
|
||||||
|
func NewUserIDCalculator(secret []byte) *UserIDCalculator {
|
||||||
|
return &UserIDCalculator{
|
||||||
|
Secret: secret,
|
||||||
|
TimeCost: 15,
|
||||||
|
MemoryCost: 128 * 1024,
|
||||||
|
Threads: 2,
|
||||||
|
HashLen: 16,
|
||||||
|
Lock: new(sync.Mutex),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Calculate accepts a name and password and returns the calculated UserID.
|
||||||
|
func (c *UserIDCalculator) Calculate(name, password string) UserID {
|
||||||
|
|
||||||
|
input := fmt.Sprintf("%q:%q", name, password)
|
||||||
|
|
||||||
|
hashB := argon2.IDKey(
|
||||||
|
[]byte(input),
|
||||||
|
c.Secret, // salt
|
||||||
|
c.TimeCost, c.MemoryCost, c.Threads,
|
||||||
|
c.HashLen,
|
||||||
|
)
|
||||||
|
|
||||||
|
return UserID{
|
||||||
|
Name: name,
|
||||||
|
Hash: hex.EncodeToString(hashB),
|
||||||
|
}
|
||||||
|
}
|
26
srv/src/chat/user_test.go
Normal file
26
srv/src/chat/user_test.go
Normal file
@ -0,0 +1,26 @@
|
|||||||
|
package chat
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestUserIDCalculator(t *testing.T) {
|
||||||
|
|
||||||
|
const name, password = "name", "password"
|
||||||
|
|
||||||
|
c := NewUserIDCalculator([]byte("foo"))
|
||||||
|
|
||||||
|
// calculating with same params twice should result in same UserID
|
||||||
|
userID := c.Calculate(name, password)
|
||||||
|
assert.Equal(t, userID, c.Calculate(name, password))
|
||||||
|
|
||||||
|
// changing either name or password should result in a different Hash
|
||||||
|
assert.NotEqual(t, userID.Hash, c.Calculate(name+"!", password).Hash)
|
||||||
|
assert.NotEqual(t, userID.Hash, c.Calculate(name, password+"!").Hash)
|
||||||
|
|
||||||
|
// changing the secret should change the UserID
|
||||||
|
c.Secret = []byte("bar")
|
||||||
|
assert.NotEqual(t, userID, c.Calculate(name, password))
|
||||||
|
}
|
28
srv/src/chat/util.go
Normal file
28
srv/src/chat/util.go
Normal file
@ -0,0 +1,28 @@
|
|||||||
|
package chat
|
||||||
|
|
||||||
|
import (
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/mediocregopher/radix/v4"
|
||||||
|
)
|
||||||
|
|
||||||
|
func parseStreamEntryID(str string) (radix.StreamEntryID, error) {
|
||||||
|
|
||||||
|
split := strings.SplitN(str, "-", 2)
|
||||||
|
if len(split) != 2 {
|
||||||
|
return radix.StreamEntryID{}, errInvalidMessageID
|
||||||
|
}
|
||||||
|
|
||||||
|
time, err := strconv.ParseUint(split[0], 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
return radix.StreamEntryID{}, errInvalidMessageID
|
||||||
|
}
|
||||||
|
|
||||||
|
seq, err := strconv.ParseUint(split[1], 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
return radix.StreamEntryID{}, errInvalidMessageID
|
||||||
|
}
|
||||||
|
|
||||||
|
return radix.StreamEntryID{Time: time, Seq: seq}, nil
|
||||||
|
}
|
Before Width: | Height: | Size: 68 KiB After Width: | Height: | Size: 68 KiB |
@ -8,6 +8,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
cfgpkg "github.com/mediocregopher/blog.mediocregopher.com/srv/cfg"
|
cfgpkg "github.com/mediocregopher/blog.mediocregopher.com/srv/cfg"
|
||||||
|
"github.com/mediocregopher/blog.mediocregopher.com/srv/chat"
|
||||||
"github.com/mediocregopher/blog.mediocregopher.com/srv/http"
|
"github.com/mediocregopher/blog.mediocregopher.com/srv/http"
|
||||||
"github.com/mediocregopher/blog.mediocregopher.com/srv/mailinglist"
|
"github.com/mediocregopher/blog.mediocregopher.com/srv/mailinglist"
|
||||||
"github.com/mediocregopher/blog.mediocregopher.com/srv/post"
|
"github.com/mediocregopher/blog.mediocregopher.com/srv/post"
|
||||||
@ -44,6 +45,14 @@ func main() {
|
|||||||
httpParams.SetupCfg(cfg)
|
httpParams.SetupCfg(cfg)
|
||||||
ctx = mctx.WithAnnotator(ctx, &httpParams)
|
ctx = mctx.WithAnnotator(ctx, &httpParams)
|
||||||
|
|
||||||
|
var radixClient cfgpkg.RadixClient
|
||||||
|
radixClient.SetupCfg(cfg)
|
||||||
|
defer radixClient.Close()
|
||||||
|
ctx = mctx.WithAnnotator(ctx, &radixClient)
|
||||||
|
|
||||||
|
chatGlobalRoomMaxMsgs := cfg.Int("chat-global-room-max-messages", 1000, "Maximum number of messages the global chat room can retain")
|
||||||
|
chatUserIDCalcSecret := cfg.String("chat-user-id-calc-secret", "", "Secret to use when calculating user ids")
|
||||||
|
|
||||||
// initialization
|
// initialization
|
||||||
err := cfg.Init(ctx)
|
err := cfg.Init(ctx)
|
||||||
|
|
||||||
@ -57,6 +66,10 @@ func main() {
|
|||||||
logger.Fatal(ctx, "initializing", err)
|
logger.Fatal(ctx, "initializing", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ctx = mctx.Annotate(ctx,
|
||||||
|
"chatGlobalRoomMaxMsgs", *chatGlobalRoomMaxMsgs,
|
||||||
|
)
|
||||||
|
|
||||||
clock := clock.Realtime()
|
clock := clock.Realtime()
|
||||||
|
|
||||||
powStore := pow.NewMemoryStore(clock)
|
powStore := pow.NewMemoryStore(clock)
|
||||||
@ -87,6 +100,19 @@ func main() {
|
|||||||
|
|
||||||
ml := mailinglist.New(mlParams)
|
ml := mailinglist.New(mlParams)
|
||||||
|
|
||||||
|
chatGlobalRoom, err := chat.NewRoom(ctx, chat.RoomParams{
|
||||||
|
Logger: logger.WithNamespace("global-chat-room"),
|
||||||
|
Redis: radixClient.Client,
|
||||||
|
ID: "global",
|
||||||
|
MaxMessages: *chatGlobalRoomMaxMsgs,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
logger.Fatal(ctx, "initializing global chat room", err)
|
||||||
|
}
|
||||||
|
defer chatGlobalRoom.Close()
|
||||||
|
|
||||||
|
chatUserIDCalc := chat.NewUserIDCalculator([]byte(*chatUserIDCalcSecret))
|
||||||
|
|
||||||
postSQLDB, err := post.NewSQLDB(dataDir)
|
postSQLDB, err := post.NewSQLDB(dataDir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Fatal(ctx, "initializing sql db for post data", err)
|
logger.Fatal(ctx, "initializing sql db for post data", err)
|
||||||
@ -103,6 +129,8 @@ func main() {
|
|||||||
httpParams.PostAssetStore = postAssetStore
|
httpParams.PostAssetStore = postAssetStore
|
||||||
httpParams.PostDraftStore = postDraftStore
|
httpParams.PostDraftStore = postDraftStore
|
||||||
httpParams.MailingList = ml
|
httpParams.MailingList = ml
|
||||||
|
httpParams.GlobalRoom = chatGlobalRoom
|
||||||
|
httpParams.UserIDCalculator = chatUserIDCalc
|
||||||
|
|
||||||
logger.Info(ctx, "listening")
|
logger.Info(ctx, "listening")
|
||||||
httpAPI, err := http.New(httpParams)
|
httpAPI, err := http.New(httpParams)
|
28
srv/src/cmd/userid-calc-cli/main.go
Normal file
28
srv/src/cmd/userid-calc-cli/main.go
Normal file
@ -0,0 +1,28 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"flag"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/mediocregopher/blog.mediocregopher.com/srv/chat"
|
||||||
|
)
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
|
||||||
|
secret := flag.String("secret", "", "Secret to use when calculating UserIDs")
|
||||||
|
name := flag.String("name", "", "")
|
||||||
|
password := flag.String("password", "", "")
|
||||||
|
|
||||||
|
flag.Parse()
|
||||||
|
|
||||||
|
calc := chat.NewUserIDCalculator([]byte(*secret))
|
||||||
|
userID := calc.Calculate(*name, *password)
|
||||||
|
|
||||||
|
b, err := json.Marshal(userID)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Println(string(b))
|
||||||
|
}
|
@ -3,14 +3,17 @@ module github.com/mediocregopher/blog.mediocregopher.com/srv
|
|||||||
go 1.16
|
go 1.16
|
||||||
|
|
||||||
require (
|
require (
|
||||||
|
github.com/adrg/frontmatter v0.2.0
|
||||||
github.com/emersion/go-sasl v0.0.0-20200509203442-7bfe0ed36a21
|
github.com/emersion/go-sasl v0.0.0-20200509203442-7bfe0ed36a21
|
||||||
github.com/emersion/go-smtp v0.15.0
|
github.com/emersion/go-smtp v0.15.0
|
||||||
github.com/gomarkdown/markdown v0.0.0-20220510115730-2372b9aa33e5
|
github.com/gomarkdown/markdown v0.0.0-20220510115730-2372b9aa33e5
|
||||||
github.com/google/uuid v1.3.0
|
github.com/google/uuid v1.3.0
|
||||||
github.com/gorilla/feeds v1.1.1
|
github.com/gorilla/feeds v1.1.1 // indirect
|
||||||
github.com/hashicorp/golang-lru v0.5.4
|
github.com/gorilla/websocket v1.4.2
|
||||||
|
github.com/hashicorp/golang-lru v0.5.4 // indirect
|
||||||
github.com/mattn/go-sqlite3 v1.14.8
|
github.com/mattn/go-sqlite3 v1.14.8
|
||||||
github.com/mediocregopher/mediocre-go-lib/v2 v2.0.0-beta.0.0.20220506011745-cbeee71cb1ee
|
github.com/mediocregopher/mediocre-go-lib/v2 v2.0.0-beta.0.0.20220506011745-cbeee71cb1ee
|
||||||
|
github.com/mediocregopher/radix/v4 v4.0.0-beta.1.0.20210726230805-d62fa1b2e3cb
|
||||||
github.com/rubenv/sql-migrate v0.0.0-20210614095031-55d5740dbbcc
|
github.com/rubenv/sql-migrate v0.0.0-20210614095031-55d5740dbbcc
|
||||||
github.com/stretchr/testify v1.7.0
|
github.com/stretchr/testify v1.7.0
|
||||||
github.com/tilinna/clock v1.1.0
|
github.com/tilinna/clock v1.1.0
|
||||||
@ -19,5 +22,5 @@ require (
|
|||||||
golang.org/x/image v0.0.0-20220413100746-70e8d0d3baa9
|
golang.org/x/image v0.0.0-20220413100746-70e8d0d3baa9
|
||||||
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e // indirect
|
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e // indirect
|
||||||
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect
|
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect
|
||||||
gopkg.in/yaml.v3 v3.0.1
|
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||||
)
|
)
|
@ -1,9 +1,12 @@
|
|||||||
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
|
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
|
||||||
|
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
|
||||||
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
|
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
|
||||||
github.com/Masterminds/goutils v1.1.0/go.mod h1:8cTjp+g8YejhMuvIA5y2vz3BpJxksy863GQaJW2MFNU=
|
github.com/Masterminds/goutils v1.1.0/go.mod h1:8cTjp+g8YejhMuvIA5y2vz3BpJxksy863GQaJW2MFNU=
|
||||||
github.com/Masterminds/semver v1.5.0/go.mod h1:MB6lktGJrhw8PrUyiEoblNEGEQ+RzHPF078ddwwvV3Y=
|
github.com/Masterminds/semver v1.5.0/go.mod h1:MB6lktGJrhw8PrUyiEoblNEGEQ+RzHPF078ddwwvV3Y=
|
||||||
github.com/Masterminds/sprig v2.22.0+incompatible/go.mod h1:y6hNFY5UBTIWBxnzTeuNhlNS5hqE0NB0E6fgfo2Br3o=
|
github.com/Masterminds/sprig v2.22.0+incompatible/go.mod h1:y6hNFY5UBTIWBxnzTeuNhlNS5hqE0NB0E6fgfo2Br3o=
|
||||||
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
|
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
|
||||||
|
github.com/adrg/frontmatter v0.2.0 h1:/DgnNe82o03riBd1S+ZDjd43wAmC6W35q67NHeLkPd4=
|
||||||
|
github.com/adrg/frontmatter v0.2.0/go.mod h1:93rQCj3z3ZlwyxxpQioRKC1wDLto4aXHrbqIsnH9wmE=
|
||||||
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
|
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
|
||||||
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
|
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
|
||||||
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
|
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
|
||||||
@ -65,6 +68,8 @@ github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+
|
|||||||
github.com/gorilla/feeds v1.1.1 h1:HwKXxqzcRNg9to+BbvJog4+f3s/xzvtZXICcQGutYfY=
|
github.com/gorilla/feeds v1.1.1 h1:HwKXxqzcRNg9to+BbvJog4+f3s/xzvtZXICcQGutYfY=
|
||||||
github.com/gorilla/feeds v1.1.1/go.mod h1:Nk0jZrvPFZX1OBe5NPiddPw7CfwF6Q9eqzaBbaightA=
|
github.com/gorilla/feeds v1.1.1/go.mod h1:Nk0jZrvPFZX1OBe5NPiddPw7CfwF6Q9eqzaBbaightA=
|
||||||
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
|
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
|
||||||
|
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
|
||||||
|
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
|
||||||
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
|
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
|
||||||
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
|
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
|
||||||
github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
|
github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
|
||||||
@ -112,6 +117,8 @@ github.com/mattn/go-sqlite3 v1.14.8/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A
|
|||||||
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
|
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
|
||||||
github.com/mediocregopher/mediocre-go-lib/v2 v2.0.0-beta.0.0.20220506011745-cbeee71cb1ee h1:AWRuhgn7iumyhPuxKwed1F1Ri2dXMwxKfp5YIdpnQIY=
|
github.com/mediocregopher/mediocre-go-lib/v2 v2.0.0-beta.0.0.20220506011745-cbeee71cb1ee h1:AWRuhgn7iumyhPuxKwed1F1Ri2dXMwxKfp5YIdpnQIY=
|
||||||
github.com/mediocregopher/mediocre-go-lib/v2 v2.0.0-beta.0.0.20220506011745-cbeee71cb1ee/go.mod h1:wOZVlnKYvIbkzyCJ3dxy1k40XkirvCd1pisX2O91qoQ=
|
github.com/mediocregopher/mediocre-go-lib/v2 v2.0.0-beta.0.0.20220506011745-cbeee71cb1ee/go.mod h1:wOZVlnKYvIbkzyCJ3dxy1k40XkirvCd1pisX2O91qoQ=
|
||||||
|
github.com/mediocregopher/radix/v4 v4.0.0-beta.1.0.20210726230805-d62fa1b2e3cb h1:7Y2vAC5q44VJzbBUdxRUEqfz88ySJ/6yXXkpQ+sxke4=
|
||||||
|
github.com/mediocregopher/radix/v4 v4.0.0-beta.1.0.20210726230805-d62fa1b2e3cb/go.mod h1:ajchozX/6ELmydxWeWM6xCFHVpZ4+67LXHOTOVR0nCE=
|
||||||
github.com/mitchellh/cli v1.1.2/go.mod h1:6iaV0fGdElS6dPBx0EApTxHrcWvmJphyh2n8YBLPPZ4=
|
github.com/mitchellh/cli v1.1.2/go.mod h1:6iaV0fGdElS6dPBx0EApTxHrcWvmJphyh2n8YBLPPZ4=
|
||||||
github.com/mitchellh/copystructure v1.0.0/go.mod h1:SNtv71yrdKgLRyLFxmLdkAbkKEFWgYaq1OVrnRcwhnw=
|
github.com/mitchellh/copystructure v1.0.0/go.mod h1:SNtv71yrdKgLRyLFxmLdkAbkKEFWgYaq1OVrnRcwhnw=
|
||||||
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
|
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
|
||||||
@ -160,6 +167,7 @@ github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5
|
|||||||
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||||
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
|
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
|
||||||
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||||
|
github.com/tilinna/clock v1.0.2/go.mod h1:ZsP7BcY7sEEz7ktc0IVy8Us6boDrK8VradlKRUGfOao=
|
||||||
github.com/tilinna/clock v1.1.0 h1:6IQQQCo6KoBxVudv6gwtY8o4eDfhHo8ojA5dP0MfhSs=
|
github.com/tilinna/clock v1.1.0 h1:6IQQQCo6KoBxVudv6gwtY8o4eDfhHo8ojA5dP0MfhSs=
|
||||||
github.com/tilinna/clock v1.1.0/go.mod h1:ZsP7BcY7sEEz7ktc0IVy8Us6boDrK8VradlKRUGfOao=
|
github.com/tilinna/clock v1.1.0/go.mod h1:ZsP7BcY7sEEz7ktc0IVy8Us6boDrK8VradlKRUGfOao=
|
||||||
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
|
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
|
||||||
@ -244,7 +252,9 @@ gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bl
|
|||||||
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||||
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||||
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||||
|
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
|
||||||
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
|
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
|
||||||
|
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
|
||||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
@ -17,6 +17,7 @@ import (
|
|||||||
|
|
||||||
lru "github.com/hashicorp/golang-lru"
|
lru "github.com/hashicorp/golang-lru"
|
||||||
"github.com/mediocregopher/blog.mediocregopher.com/srv/cfg"
|
"github.com/mediocregopher/blog.mediocregopher.com/srv/cfg"
|
||||||
|
"github.com/mediocregopher/blog.mediocregopher.com/srv/chat"
|
||||||
"github.com/mediocregopher/blog.mediocregopher.com/srv/http/apiutil"
|
"github.com/mediocregopher/blog.mediocregopher.com/srv/http/apiutil"
|
||||||
"github.com/mediocregopher/blog.mediocregopher.com/srv/mailinglist"
|
"github.com/mediocregopher/blog.mediocregopher.com/srv/mailinglist"
|
||||||
"github.com/mediocregopher/blog.mediocregopher.com/srv/post"
|
"github.com/mediocregopher/blog.mediocregopher.com/srv/post"
|
||||||
@ -40,6 +41,9 @@ type Params struct {
|
|||||||
|
|
||||||
MailingList mailinglist.MailingList
|
MailingList mailinglist.MailingList
|
||||||
|
|
||||||
|
GlobalRoom chat.Room
|
||||||
|
UserIDCalculator *chat.UserIDCalculator
|
||||||
|
|
||||||
// PublicURL is the base URL which site visitors can navigate to.
|
// PublicURL is the base URL which site visitors can navigate to.
|
||||||
PublicURL *url.URL
|
PublicURL *url.URL
|
||||||
|
|
||||||
@ -172,9 +176,16 @@ func (a *api) apiHandler() http.Handler {
|
|||||||
mux.Handle("/mailinglist/finalize", a.mailingListFinalizeHandler())
|
mux.Handle("/mailinglist/finalize", a.mailingListFinalizeHandler())
|
||||||
mux.Handle("/mailinglist/unsubscribe", a.mailingListUnsubscribeHandler())
|
mux.Handle("/mailinglist/unsubscribe", a.mailingListUnsubscribeHandler())
|
||||||
|
|
||||||
return apiutil.MethodMux(map[string]http.Handler{
|
mux.Handle("/chat/global/", http.StripPrefix("/chat/global", newChatHandler(
|
||||||
"POST": mux,
|
a.params.GlobalRoom,
|
||||||
})
|
a.params.UserIDCalculator,
|
||||||
|
a.requirePowMiddleware,
|
||||||
|
)))
|
||||||
|
|
||||||
|
// disallowGetMiddleware is used rather than a MethodMux because it has an
|
||||||
|
// exception for websockets, which is needed for chat.
|
||||||
|
return disallowGetMiddleware(mux)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *api) blogHandler() http.Handler {
|
func (a *api) blogHandler() http.Handler {
|
@ -155,7 +155,7 @@ func (a *api) postPostAssetHandler() http.Handler {
|
|||||||
return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
|
return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
|
||||||
|
|
||||||
id := r.PostFormValue("id")
|
id := r.PostFormValue("id")
|
||||||
if id == "" {
|
if id == "/" {
|
||||||
apiutil.BadRequest(rw, r, errors.New("id is required"))
|
apiutil.BadRequest(rw, r, errors.New("id is required"))
|
||||||
return
|
return
|
||||||
}
|
}
|
211
srv/src/http/chat.go
Normal file
211
srv/src/http/chat.go
Normal file
@ -0,0 +1,211 @@
|
|||||||
|
package http
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"net/http"
|
||||||
|
"strings"
|
||||||
|
"unicode"
|
||||||
|
|
||||||
|
"github.com/gorilla/websocket"
|
||||||
|
"github.com/mediocregopher/blog.mediocregopher.com/srv/chat"
|
||||||
|
"github.com/mediocregopher/blog.mediocregopher.com/srv/http/apiutil"
|
||||||
|
)
|
||||||
|
|
||||||
|
type chatHandler struct {
|
||||||
|
*http.ServeMux
|
||||||
|
|
||||||
|
room chat.Room
|
||||||
|
userIDCalc *chat.UserIDCalculator
|
||||||
|
|
||||||
|
wsUpgrader websocket.Upgrader
|
||||||
|
}
|
||||||
|
|
||||||
|
func newChatHandler(
|
||||||
|
room chat.Room, userIDCalc *chat.UserIDCalculator,
|
||||||
|
requirePowMiddleware func(http.Handler) http.Handler,
|
||||||
|
) http.Handler {
|
||||||
|
c := &chatHandler{
|
||||||
|
ServeMux: http.NewServeMux(),
|
||||||
|
room: room,
|
||||||
|
userIDCalc: userIDCalc,
|
||||||
|
|
||||||
|
wsUpgrader: websocket.Upgrader{},
|
||||||
|
}
|
||||||
|
|
||||||
|
c.Handle("/history", c.historyHandler())
|
||||||
|
c.Handle("/user-id", requirePowMiddleware(c.userIDHandler()))
|
||||||
|
c.Handle("/append", requirePowMiddleware(c.appendHandler()))
|
||||||
|
c.Handle("/listen", c.listenHandler())
|
||||||
|
|
||||||
|
return c
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *chatHandler) historyHandler() http.Handler {
|
||||||
|
return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
|
||||||
|
limit, err := apiutil.StrToInt(r.PostFormValue("limit"), 0)
|
||||||
|
if err != nil {
|
||||||
|
apiutil.BadRequest(rw, r, fmt.Errorf("invalid limit parameter: %w", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
cursor := r.PostFormValue("cursor")
|
||||||
|
|
||||||
|
cursor, msgs, err := c.room.History(r.Context(), chat.HistoryOpts{
|
||||||
|
Limit: limit,
|
||||||
|
Cursor: cursor,
|
||||||
|
})
|
||||||
|
|
||||||
|
if argErr := (chat.ErrInvalidArg{}); errors.As(err, &argErr) {
|
||||||
|
apiutil.BadRequest(rw, r, argErr.Err)
|
||||||
|
return
|
||||||
|
} else if err != nil {
|
||||||
|
apiutil.InternalServerError(rw, r, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
apiutil.JSONResult(rw, r, struct {
|
||||||
|
Cursor string `json:"cursor"`
|
||||||
|
Messages []chat.Message `json:"messages"`
|
||||||
|
}{
|
||||||
|
Cursor: cursor,
|
||||||
|
Messages: msgs,
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *chatHandler) userID(r *http.Request) (chat.UserID, error) {
|
||||||
|
name := r.PostFormValue("name")
|
||||||
|
if l := len(name); l == 0 {
|
||||||
|
return chat.UserID{}, errors.New("name is required")
|
||||||
|
} else if l > 16 {
|
||||||
|
return chat.UserID{}, errors.New("name too long")
|
||||||
|
}
|
||||||
|
|
||||||
|
nameClean := strings.Map(func(r rune) rune {
|
||||||
|
if !unicode.IsPrint(r) {
|
||||||
|
return -1
|
||||||
|
}
|
||||||
|
return r
|
||||||
|
}, name)
|
||||||
|
|
||||||
|
if nameClean != name {
|
||||||
|
return chat.UserID{}, errors.New("name contains invalid characters")
|
||||||
|
}
|
||||||
|
|
||||||
|
password := r.PostFormValue("password")
|
||||||
|
if l := len(password); l == 0 {
|
||||||
|
return chat.UserID{}, errors.New("password is required")
|
||||||
|
} else if l > 128 {
|
||||||
|
return chat.UserID{}, errors.New("password too long")
|
||||||
|
}
|
||||||
|
|
||||||
|
return c.userIDCalc.Calculate(name, password), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *chatHandler) userIDHandler() http.Handler {
|
||||||
|
return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
|
||||||
|
userID, err := c.userID(r)
|
||||||
|
if err != nil {
|
||||||
|
apiutil.BadRequest(rw, r, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
apiutil.JSONResult(rw, r, struct {
|
||||||
|
UserID chat.UserID `json:"userID"`
|
||||||
|
}{
|
||||||
|
UserID: userID,
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *chatHandler) appendHandler() http.Handler {
|
||||||
|
return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
|
||||||
|
userID, err := c.userID(r)
|
||||||
|
if err != nil {
|
||||||
|
apiutil.BadRequest(rw, r, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
body := r.PostFormValue("body")
|
||||||
|
|
||||||
|
if l := len(body); l == 0 {
|
||||||
|
apiutil.BadRequest(rw, r, errors.New("body is required"))
|
||||||
|
return
|
||||||
|
|
||||||
|
} else if l > 300 {
|
||||||
|
apiutil.BadRequest(rw, r, errors.New("body too long"))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
msg, err := c.room.Append(r.Context(), chat.Message{
|
||||||
|
UserID: userID,
|
||||||
|
Body: body,
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
apiutil.InternalServerError(rw, r, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
apiutil.JSONResult(rw, r, struct {
|
||||||
|
MessageID string `json:"messageID"`
|
||||||
|
}{
|
||||||
|
MessageID: msg.ID,
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *chatHandler) listenHandler() http.Handler {
|
||||||
|
return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
|
||||||
|
|
||||||
|
ctx := r.Context()
|
||||||
|
sinceID := r.FormValue("sinceID")
|
||||||
|
|
||||||
|
conn, err := c.wsUpgrader.Upgrade(rw, r, nil)
|
||||||
|
if err != nil {
|
||||||
|
apiutil.BadRequest(rw, r, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer conn.Close()
|
||||||
|
|
||||||
|
it, err := c.room.Listen(ctx, sinceID)
|
||||||
|
|
||||||
|
if errors.As(err, new(chat.ErrInvalidArg)) {
|
||||||
|
apiutil.BadRequest(rw, r, err)
|
||||||
|
return
|
||||||
|
|
||||||
|
} else if errors.Is(err, context.Canceled) {
|
||||||
|
return
|
||||||
|
|
||||||
|
} else if err != nil {
|
||||||
|
apiutil.InternalServerError(rw, r, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
defer it.Close()
|
||||||
|
|
||||||
|
for {
|
||||||
|
|
||||||
|
msg, err := it.Next(ctx)
|
||||||
|
if errors.Is(err, context.Canceled) {
|
||||||
|
return
|
||||||
|
|
||||||
|
} else if err != nil {
|
||||||
|
apiutil.InternalServerError(rw, r, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
err = conn.WriteJSON(struct {
|
||||||
|
Message chat.Message `json:"message"`
|
||||||
|
}{
|
||||||
|
Message: msg,
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
apiutil.GetRequestLogger(r).Error(ctx, "couldn't write message", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in New Issue
Block a user