~sircmpwn/git.sr.ht

git.sr.ht/gitsrht-update-hook/stage-3.go -rw-r--r-- 4.6 KiB
5c21171dDrew DeVault API: Implement mutation { deleteArtifact } 22 hours ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
package main

import (
	"context"
	"database/sql"
	"encoding/json"
	"os"
	"path/filepath"
	"strconv"

	_ "github.com/lib/pq"
	"github.com/minio/minio-go/v7"
	"github.com/minio/minio-go/v7/pkg/credentials"
)

func stage3() {
	var context PushContext
	contextJson, ctxOk := os.LookupEnv("SRHT_PUSH_CTX")
	pushUuid, pushOk := os.LookupEnv("SRHT_PUSH")
	if !ctxOk || !pushOk {
		logger.Fatal("Missing required variables in environment, " +
			"configuration error?")
	}

	logger.Printf("Running stage 3 for push %s", pushUuid)

	if err := json.Unmarshal([]byte(contextJson), &context); err != nil {
		logger.Fatalf("unmarshal SRHT_PUSH_CTX: %v", err)
	}

	db, err := sql.Open("postgres", pgcs)
	if err != nil {
		logger.Fatalf("Failed to open a database connection: %v", err)
	}

	var subscriptions []WebhookSubscription
	var deliveries []WebhookDelivery
	deliveriesJsonLen, err := strconv.Atoi(os.Args[1])
	if err != nil {
		logger.Fatalf("deliveriesJson length \"%v\": %v", string(os.Args[1]), err)
	}
	deliveriesJson := make([]byte, deliveriesJsonLen)
	if read, err := os.Stdin.Read(deliveriesJson); read != len(deliveriesJson) {
		logger.Fatalf("Failed to read deliveries: %v, %v", read, err)
	}
	if err := json.Unmarshal(deliveriesJson, &deliveries); err != nil {
		logger.Fatalf("Unable to unmarhsal delivery array: %v", err)
	}

	payloadLen, err := strconv.Atoi(os.Args[2])
	if err != nil {
		logger.Fatalf("payload length \"%v\": %v", string(os.Args[2]), err)
	}
	payload := make([]byte, payloadLen)
	if read, err := os.Stdin.Read(payload); read != len(payload) {
		logger.Fatalf("Failed to read payload: %v, %v", read, err)
	}

	var decoded WebhookPayload
	err = json.Unmarshal(payload, &decoded)
	if err != nil {
		logger.Fatalf("Failed to decode payload: %v\n", err)
	}

	var rows *sql.Rows
	if rows, err = db.Query(`
			SELECT id, url, events
			FROM repo_webhook_subscription rws
			WHERE rws.repo_id = $1
				AND rws.events LIKE '%repo:post-update%'
				AND rws.sync = false`, context.Repo.Id); err != nil {
		logger.Fatalf("Error fetching webhooks: %v", err)
	}
	defer rows.Close()

	for i := 0; rows.Next(); i++ {
		var whs WebhookSubscription
		if err = rows.Scan(&whs.Id, &whs.Url, &whs.Events); err != nil {
			logger.Fatalf("Scanning webhook rows: %v", err)
		}
		subscriptions = append(subscriptions, whs)
	}

	logger.Printf("Making %d deliveries and recording %d from stage 2",
		len(subscriptions), len(deliveries))

	deliveries = append(deliveries, deliverWebhooks(
		subscriptions, payload, false)...)
	for _, delivery := range deliveries {
		if _, err := db.Exec(`
			INSERT INTO repo_webhook_delivery (
				uuid,
				created,
				event,
				url,
				payload,
				payload_headers,
				response,
				response_status,
				response_headers,
				subscription_id
			) VALUES (
				$1, NOW() AT TIME ZONE 'UTC', 'repo:post-update',
				$2, $3, $4, $5, $6, $7, $8
			);
		`, delivery.UUID, delivery.Url,
			delivery.Payload, delivery.Headers,
			delivery.Response, delivery.ResponseStatus, delivery.ResponseHeaders,
			delivery.SubscriptionId); err != nil {

			logger.Fatalf("Error inserting webhook delivery: %v", err)
		}
	}

	logger.Printf("Delivered %d webhooks, recorded %d deliveries",
		len(subscriptions), len(deliveries))

	if _, ok := config.Get("objects", "s3-upstream"); ok {
		deleteArtifacts(&context, db, &decoded)
	}
}

func deleteArtifacts(ctx *PushContext, db *sql.DB, payload *WebhookPayload) {
	s3upstream, _ := config.Get("objects", "s3-upstream")
	s3accessKey, _ := config.Get("objects", "s3-access-key")
	s3secretKey, _ := config.Get("objects", "s3-secret-key")
	s3bucket, _ := config.Get("git.sr.ht", "s3-bucket")
	s3prefix, _ := config.Get("git.sr.ht", "s3-prefix")

	minioClient, err := minio.New(s3upstream, &minio.Options{
		Creds:  credentials.NewStaticV4(s3accessKey, s3secretKey, ""),
		Secure: true,
	})
	if err != nil {
		logger.Fatalf("Error connecting to S3: %e", err)
	}

	for _, ref := range payload.Refs {
		if ref.New != nil || ref.Old == nil {
			continue
		}

		var rows *sql.Rows
		if rows, err = db.Query(`
			DELETE FROM artifacts
			WHERE repo_id = $1 AND commit = $2
			RETURNING filename;`, ctx.Repo.Id, ref.Old.Id); err != nil {

			logger.Fatalf("Error fetching artifacts: %v", err)
		}
		defer rows.Close()

		for rows.Next() {
			var filename string
			if err = rows.Scan(&filename); err != nil {
				logger.Fatalf("Scanning artifact rows: %e", err)
			}
			path := filepath.Join(s3prefix, "artifacts",
				"~"+ctx.Repo.OwnerName, ctx.Repo.Name, filename)
			logger.Printf("Deleting S3 object %s", path)
			err = minioClient.RemoveObject(context.TODO(), s3bucket, path,
				minio.RemoveObjectOptions{})
			if err != nil {
				logger.Printf("Error removing S3 object: %e", err)
			}
		}
	}
}