Reimplement decision close job

This commit is contained in:
Jan Dittberner 2022-05-21 13:51:17 +02:00
parent 01b95f2253
commit 68d6f4bcdc
6 changed files with 321 additions and 57 deletions

View file

@ -43,6 +43,8 @@ type mailConfig struct {
SMTPHost string `yaml:"host"` SMTPHost string `yaml:"host"`
SMTPPort int `yaml:"port"` SMTPPort int `yaml:"port"`
NotificationSenderAddress string `yaml:"notification_sender_address"` NotificationSenderAddress string `yaml:"notification_sender_address"`
NoticeMailAddress string `yaml:"notice_mail_address"`
VoteNoticeMailAddress string `yaml:"vote_notice_mail_address"`
} }
type httpTimeoutConfig struct { type httpTimeoutConfig struct {
@ -53,8 +55,6 @@ type httpTimeoutConfig struct {
} }
type Config struct { type Config struct {
NoticeMailAddress string `yaml:"notice_mail_address"`
VoteNoticeMailAddress string `yaml:"vote_notice_mail_address"`
DatabaseFile string `yaml:"database_file"` DatabaseFile string `yaml:"database_file"`
ClientCACertificates string `yaml:"client_ca_certificates"` ClientCACertificates string `yaml:"client_ca_certificates"`
ServerCert string `yaml:"server_certificate"` ServerCert string `yaml:"server_certificate"`
@ -64,8 +64,8 @@ type Config struct {
BaseURL string `yaml:"base_url"` BaseURL string `yaml:"base_url"`
HTTPAddress string `yaml:"http_address,omitempty"` HTTPAddress string `yaml:"http_address,omitempty"`
HTTPSAddress string `yaml:"https_address,omitempty"` HTTPSAddress string `yaml:"https_address,omitempty"`
MailConfig mailConfig `yaml:"mail_server"` MailConfig *mailConfig `yaml:"mail_server"`
Timeouts httpTimeoutConfig `yaml:"timeouts,omitempty"` Timeouts *httpTimeoutConfig `yaml:"timeouts,omitempty"`
} }
type configKey int type configKey int
@ -84,7 +84,7 @@ func parseConfig(ctx context.Context, configFile string) (context.Context, error
config := &Config{ config := &Config{
HTTPAddress: "127.0.0.1:8000", HTTPAddress: "127.0.0.1:8000",
HTTPSAddress: "127.0.0.1:8433", HTTPSAddress: "127.0.0.1:8433",
Timeouts: httpTimeoutConfig{ Timeouts: &httpTimeoutConfig{
Idle: httpIdleTimeout, Idle: httpIdleTimeout,
ReadHeader: httpReadHeaderTimeout, ReadHeader: httpReadHeaderTimeout,
Read: httpReadTimeout, Read: httpReadTimeout,

View file

@ -115,7 +115,6 @@ func (r *RemindVotersJob) Stop() {
func (app *application) NewRemindVotersJob( func (app *application) NewRemindVotersJob(
rescheduleChannel chan Job, rescheduleChannel chan Job,
notificationChannel chan NotificationMail,
) Job { ) Job {
return &RemindVotersJob{ return &RemindVotersJob{
infoLog: app.infoLog, infoLog: app.infoLog,
@ -123,31 +122,85 @@ func (app *application) NewRemindVotersJob(
voters: app.voters, voters: app.voters,
decisions: app.decisions, decisions: app.decisions,
reschedule: rescheduleChannel, reschedule: rescheduleChannel,
notify: notificationChannel, notify: app.mailNotifier.notifyChannel,
} }
} }
type closeDecisionsJob struct{} type closeDecisionsJob struct {
timer *time.Timer
infoLog *log.Logger
errorLog *log.Logger
decisions *models.DecisionModel
reschedule chan Job
notify chan NotificationMail
}
func (c *closeDecisionsJob) Schedule() { func (c *closeDecisionsJob) Schedule() {
// TODO implement me var (
panic("implement me") nextDue *time.Time
err error
)
nextDue, err = c.decisions.GetNextPendingDecisionDue()
if err != nil {
c.errorLog.Printf("could not get next pending due date")
c.Stop()
return
}
if nextDue == nil {
c.infoLog.Printf("no next planned execution of CloseDecisionsJob")
c.Stop()
return
}
c.infoLog.Printf("scheduling CloseDecisionsJob for %s", nextDue)
when := time.Until(nextDue.Add(time.Second))
if c.timer == nil {
c.timer = time.AfterFunc(when, c.Run)
return
}
c.timer.Reset(when)
} }
func (c *closeDecisionsJob) Run() { func (c *closeDecisionsJob) Run() {
// TODO implement me c.infoLog.Printf("running CloseDecisionsJob")
panic("implement me")
results, err := c.decisions.CloseDecisions()
if err != nil {
c.errorLog.Printf("closing decisions failed: %v", err)
}
for _, res := range results {
c.notify <- &ClosedDecisionNotification{decision: res}
}
c.reschedule <- c
} }
func (c *closeDecisionsJob) Stop() { func (c *closeDecisionsJob) Stop() {
// TODO implement me if c.timer != nil {
panic("implement me") c.timer.Stop()
c.timer = nil
}
} }
func NewCloseDecisionsJob() Job { func (app *application) NewCloseDecisionsJob(
// TODO implement real job rescheduleChannel chan Job,
) Job {
return &closeDecisionsJob{} return &closeDecisionsJob{
infoLog: app.infoLog,
errorLog: app.errorLog,
decisions: app.decisions,
reschedule: rescheduleChannel,
notify: app.mailNotifier.notifyChannel,
}
} }
type JobScheduler struct { type JobScheduler struct {
@ -167,8 +220,8 @@ func (app *application) NewJobScheduler() {
rescheduleChannel: rescheduleChannel, rescheduleChannel: rescheduleChannel,
} }
app.jobScheduler.addJob(NewCloseDecisionsJob()) app.jobScheduler.addJob(app.NewCloseDecisionsJob(rescheduleChannel))
app.jobScheduler.addJob(app.NewRemindVotersJob(rescheduleChannel, app.mailNotifier.notifyChannel)) app.jobScheduler.addJob(app.NewRemindVotersJob(rescheduleChannel))
} }
func (js *JobScheduler) Schedule(quitChannel chan struct{}) { func (js *JobScheduler) Schedule(quitChannel chan struct{}) {

View file

@ -47,7 +47,7 @@ type application struct {
ctx context.Context ctx context.Context
jobScheduler *JobScheduler jobScheduler *JobScheduler
mailNotifier *MailNotifier mailNotifier *MailNotifier
mailConfig mailConfig mailConfig *mailConfig
baseURL string baseURL string
} }

View file

@ -12,11 +12,6 @@ import (
"gopkg.in/mail.v2" "gopkg.in/mail.v2"
) )
type headerData struct {
name string
value []string
}
type recipientData struct { type recipientData struct {
field, address, name string field, address, name string
} }
@ -25,12 +20,12 @@ type NotificationContent struct {
template string template string
data interface{} data interface{}
subject string subject string
headers []headerData headers map[string][]string
recipients []recipientData recipients []recipientData
} }
type NotificationMail interface { type NotificationMail interface {
GetNotificationContent() *NotificationContent GetNotificationContent(*mailConfig) *NotificationContent
} }
type MailNotifier struct { type MailNotifier struct {
@ -53,7 +48,7 @@ func (app *application) StartMailNotifier() {
for { for {
select { select {
case notification := <-app.mailNotifier.notifyChannel: case notification := <-app.mailNotifier.notifyChannel:
content := notification.GetNotificationContent() content := notification.GetNotificationContent(app.mailConfig)
mailText, err := content.buildMail(app.baseURL) mailText, err := content.buildMail(app.baseURL)
if err != nil { if err != nil {
@ -63,6 +58,8 @@ func (app *application) StartMailNotifier() {
} }
m := mail.NewMessage() m := mail.NewMessage()
m.SetHeaders(content.headers)
m.SetAddressHeader("From", app.mailNotifier.senderAddress, "CAcert board voting system") m.SetAddressHeader("From", app.mailNotifier.senderAddress, "CAcert board voting system")
for _, recipient := range content.recipients { for _, recipient := range content.recipients {
@ -71,10 +68,6 @@ func (app *application) StartMailNotifier() {
m.SetHeader("Subject", content.subject) m.SetHeader("Subject", content.subject)
for _, header := range content.headers {
m.SetHeader(header.name, header.value...)
}
m.SetBody("text/plain", mailText.String()) m.SetBody("text/plain", mailText.String())
if err = app.mailNotifier.dialer.DialAndSend(m); err != nil { if err = app.mailNotifier.dialer.DialAndSend(m); err != nil {
@ -120,7 +113,7 @@ type RemindVoterNotification struct {
decisions []models.Decision decisions []models.Decision
} }
func (r RemindVoterNotification) GetNotificationContent() *NotificationContent { func (r RemindVoterNotification) GetNotificationContent(*mailConfig) *NotificationContent {
return &NotificationContent{ return &NotificationContent{
template: "remind_voter_mail.txt", template: "remind_voter_mail.txt",
data: struct { data: struct {
@ -131,3 +124,28 @@ func (r RemindVoterNotification) GetNotificationContent() *NotificationContent {
recipients: []recipientData{{"To", r.voter.Reminder, r.voter.Name}}, recipients: []recipientData{{"To", r.voter.Reminder, r.voter.Name}},
} }
} }
type ClosedDecisionNotification struct {
decision *models.ClosedDecision
}
func (c *ClosedDecisionNotification) GetNotificationContent(mc *mailConfig) *NotificationContent {
return &NotificationContent{
template: "closed_motion_mail.txt",
data: c.decision,
subject: fmt.Sprintf("Re: %s - %s - finalised", c.decision.Decision.Tag, c.decision.Decision.Title),
headers: c.getHeaders(),
recipients: []recipientData{c.getRecipient(mc)},
}
}
func (c *ClosedDecisionNotification) getHeaders() map[string][]string {
return map[string][]string{
"References": {fmt.Sprintf("<%s>", c.decision.Decision.Tag)},
"In-Reply-To": {fmt.Sprintf("<%s>", c.decision.Decision.Tag)},
}
}
func (c *ClosedDecisionNotification) getRecipient(mc *mailConfig) recipientData {
return recipientData{field: "To", address: mc.NoticeMailAddress, name: "CAcert board mailing list"}
}

View file

@ -18,6 +18,8 @@ limitations under the License.
package models package models
import ( import (
"database/sql"
"errors"
"fmt" "fmt"
"log" "log"
"time" "time"
@ -108,6 +110,30 @@ func (v VoteChoice) String() string {
return unknownVariant return unknownVariant
} }
type VoteSums struct {
Ayes, Nayes, Abstains int
}
func (v *VoteSums) VoteCount() int {
return v.Ayes + v.Nayes + v.Abstains
}
func (v *VoteSums) TotalVotes() int {
return v.Ayes + v.Nayes
}
func (v *VoteSums) CalculateResult(quorum int, majority float32) (VoteStatus, string) {
if v.VoteCount() < quorum {
return voteStatusDeclined, fmt.Sprintf("Needed quorum of %d has not been reached.", quorum)
}
if (float32(v.Ayes) / float32(v.TotalVotes())) < majority {
return voteStatusDeclined, fmt.Sprintf("Needed majority of %0.2f%% has not been reached.", majority)
}
return voteStatusApproved, "Quorum and majority have been reached"
}
type Decision struct { type Decision struct {
ID int64 `db:"id"` ID int64 `db:"id"`
Proposed time.Time Proposed time.Time
@ -123,6 +149,12 @@ type Decision struct {
VoteType VoteType VoteType VoteType
} }
type ClosedDecision struct {
Decision *Decision
VoteSums *VoteSums
Reasoning string
}
type DecisionModel struct { type DecisionModel struct {
DB *sqlx.DB DB *sqlx.DB
InfoLog *log.Logger InfoLog *log.Logger
@ -164,14 +196,23 @@ VALUES (:proposed, :proponent, :title, :content, :votetype, :status, :due, :prop
return id, nil return id, nil
} }
func (m *DecisionModel) CloseDecisions() error { func (m *DecisionModel) CloseDecisions() ([]*ClosedDecision, error) {
rows, err := m.DB.NamedQuery(` tx, err := m.DB.Beginx()
if err != nil {
return nil, fmt.Errorf("could not start transaction: %w", err)
}
defer func(tx *sqlx.Tx) {
_ = tx.Rollback()
}(tx)
rows, err := tx.NamedQuery(`
SELECT decisions.id, decisions.tag, decisions.proponent, decisions.proposed, decisions.title, decisions.content, SELECT decisions.id, decisions.tag, decisions.proponent, decisions.proposed, decisions.title, decisions.content,
decisions.votetype, decisions.status, decisions.due, decisions.modified decisions.votetype, decisions.status, decisions.due, decisions.modified
FROM decisions FROM decisions
WHERE decisions.status=0 AND :now > due`, struct{ Now time.Time }{Now: time.Now().UTC()}) WHERE decisions.status=0 AND :now > due`, struct{ Now time.Time }{Now: time.Now().UTC()})
if err != nil { if err != nil {
return fmt.Errorf("fetching closable decisions failed: %w", err) return nil, fmt.Errorf("fetching closable decisions failed: %w", err)
} }
defer func(rows *sqlx.Rows) { defer func(rows *sqlx.Rows) {
@ -183,31 +224,143 @@ WHERE decisions.status=0 AND :now > due`, struct{ Now time.Time }{Now: time.Now
for rows.Next() { for rows.Next() {
decision := &Decision{} decision := &Decision{}
if err = rows.StructScan(decision); err != nil { if err = rows.StructScan(decision); err != nil {
return fmt.Errorf("scanning row failed: %w", err) return nil, fmt.Errorf("scanning row failed: %w", err)
} }
if rows.Err() != nil { if rows.Err() != nil {
return fmt.Errorf("row error: %w", err) return nil, fmt.Errorf("row error: %w", err)
} }
decisions = append(decisions, decision) decisions = append(decisions, decision)
} }
results := make([]*ClosedDecision, 0, len(decisions))
var decisionResult *ClosedDecision
for _, decision := range decisions { for _, decision := range decisions {
m.InfoLog.Printf("found closable decision %s", decision.Tag) m.InfoLog.Printf("found closable decision %s", decision.Tag)
if err = m.Close(decision.Tag); err != nil { if decisionResult, err = m.CloseDecision(tx, decision); err != nil {
return fmt.Errorf("closing decision %s failed: %w", decision.Tag, err) return nil, fmt.Errorf("closing decision %s failed: %w", decision.Tag, err)
}
} }
return nil results = append(results, decisionResult)
} }
func (m *DecisionModel) Close(tag string) error { if err = tx.Commit(); err != nil {
return nil, fmt.Errorf("could not commit transaction: %w", err)
}
return results, nil
}
func (m *DecisionModel) CloseDecision(tx *sqlx.Tx, d *Decision) (*ClosedDecision, error) {
quorum, majority := d.VoteType.QuorumAndMajority()
var (
voteSums *VoteSums
err error
reasoning string
)
if voteSums, err = m.GetVoteSums(tx, d); err != nil {
return nil, fmt.Errorf("getting vote sums failed: %w", err)
}
d.Status, reasoning = voteSums.CalculateResult(quorum, majority)
result, err := m.DB.NamedExec(
`UPDATE decisions SET status=:status, modified=CURRENT_TIMESTAMP WHERE id=:id`,
d,
)
if err != nil {
return nil, fmt.Errorf("could not execute update query: %w", err)
}
affectedRows, err := result.RowsAffected()
if err != nil {
return nil, fmt.Errorf("could not get affected rows count: %w", err)
}
if affectedRows != 1 {
return nil, fmt.Errorf("unexpected number of rows %d instead of 1", affectedRows)
}
m.InfoLog.Printf("decision %s closed with result %s: reasoning '%s'", d.Tag, d.Status, reasoning)
return &ClosedDecision{d, voteSums, reasoning}, nil
}
func (m *DecisionModel) FindUnVotedDecisionsForVoter(_ *Voter) ([]Decision, error) {
panic("not implemented") panic("not implemented")
} }
func (m *DecisionModel) FindUnVotedDecisionsForVoter(v *Voter) ([]Decision, error) { func (m *DecisionModel) GetVoteSums(tx *sqlx.Tx, d *Decision) (*VoteSums, error) {
panic("not implemented") voteRows, err := tx.NamedQuery(
`SELECT vote, COUNT(vote) FROM votes WHERE decision=$1 GROUP BY vote`,
d.ID,
)
if err != nil {
return nil, fmt.Errorf("fetching vote sums for motion %s failed: %w", d.Tag, err)
}
defer func(voteRows *sqlx.Rows) {
_ = voteRows.Close()
}(voteRows)
sums := &VoteSums{}
for voteRows.Next() {
var (
vote VoteChoice
count int
)
if err = voteRows.Err(); err != nil {
return nil, fmt.Errorf("could not fetch vote sums for motion %s: %w", d.Tag, err)
}
if err = voteRows.Scan(&vote, &count); err != nil {
return nil, fmt.Errorf("could not parse row for vote sums of motion %s: %w", d.Tag, err)
}
switch vote {
case voteAye:
sums.Ayes = count
case voteNaye:
sums.Nayes = count
case voteAbstain:
sums.Abstains = count
}
}
return sums, nil
}
func (m *DecisionModel) GetNextPendingDecisionDue() (*time.Time, error) {
row := m.DB.QueryRow(`SELECT due FROM decisions WHERE status=0 ORDER BY due LIMIT 1`, nil)
if row == nil {
return nil, errors.New("no row returned")
}
if err := row.Err(); err != nil {
return nil, fmt.Errorf("could not retrieve row for next pending decision: %w", err)
}
var due time.Time
if err := row.Scan(&due); err != nil {
if errors.Is(err, sql.ErrNoRows) {
m.InfoLog.Print("no pending decisions")
return nil, nil
}
return nil, fmt.Errorf("parsing result failed: %w", err)
}
return &due, nil
} }

View file

@ -15,7 +15,9 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
func TestDecisionModel_Create(t *testing.T) { func prepareTestDb(t *testing.T) (*sqlx.DB, *log.Logger) {
t.Helper()
testDir := t.TempDir() testDir := t.TempDir()
db, err := sql.Open("sqlite3", path.Join(testDir, "test.sqlite")) db, err := sql.Open("sqlite3", path.Join(testDir, "test.sqlite"))
@ -28,6 +30,12 @@ func TestDecisionModel_Create(t *testing.T) {
err = internal.InitializeDb(dbx.DB, logger) err = internal.InitializeDb(dbx.DB, logger)
require.NoError(t, err) require.NoError(t, err)
return dbx, logger
}
func TestDecisionModel_Create(t *testing.T) {
dbx, logger := prepareTestDb(t)
dm := models.DecisionModel{DB: dbx, InfoLog: logger} dm := models.DecisionModel{DB: dbx, InfoLog: logger}
v := &models.Voter{ v := &models.Voter{
@ -40,3 +48,35 @@ func TestDecisionModel_Create(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
assert.NotEmpty(t, id) assert.NotEmpty(t, id)
} }
func TestDecisionModel_GetNextPendingDecisionDue(t *testing.T) {
dbx, logger := prepareTestDb(t)
dm := models.DecisionModel{DB: dbx, InfoLog: logger}
var (
nextDue *time.Time
err error
)
nextDue, err = dm.GetNextPendingDecisionDue()
assert.NoError(t, err)
assert.Empty(t, nextDue)
v := &models.Voter{
ID: 1, // sqlite does not check referential integrity. Might fail with a foreign key index.
Name: "test voter",
Reminder: "test+voter@example.com",
}
due := time.Now().Add(10 * time.Minute)
_, err = dm.Create(v, models.VoteTypeMotion, "test motion", "I move that we should test more", time.Now(), due)
require.NoError(t, err)
nextDue, err = dm.GetNextPendingDecisionDue()
assert.NoError(t, err)
assert.NotEmpty(t, nextDue)
assert.Equal(t, due.UTC(), *nextDue)
}