This repository was archived by the owner on Aug 6, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 170
/
Copy pathdelete_orphaned_docs_online_check.js
257 lines (236 loc) · 7.49 KB
/
delete_orphaned_docs_online_check.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
const DocstoreManager = require('../app/src/Features/Docstore/DocstoreManager')
const { promisify } = require('util')
const { ObjectId, ReadPreference } = require('mongodb')
const { db, waitForDb } = require('../app/src/infrastructure/mongodb')
const { promiseMapWithLimit } = require('../app/src/util/promises')
const sleep = promisify(setTimeout)
const NOW_IN_S = Date.now() / 1000
const ONE_WEEK_IN_S = 60 * 60 * 24 * 7
const TEN_SECONDS = 10 * 1000
const DRY_RUN = process.env.DRY_RUN === 'true'
if (!process.env.BATCH_LAST_ID) {
console.error('Set BATCH_LAST_ID and re-run.')
process.exit(1)
}
const BATCH_LAST_ID = ObjectId(process.env.BATCH_LAST_ID)
const INCREMENT_BY_S = parseInt(process.env.INCREMENT_BY_S, 10) || ONE_WEEK_IN_S
const BATCH_SIZE = parseInt(process.env.BATCH_SIZE, 10) || 1000
const READ_CONCURRENCY_SECONDARY =
parseInt(process.env.READ_CONCURRENCY_SECONDARY, 10) || 1000
const READ_CONCURRENCY_PRIMARY =
parseInt(process.env.READ_CONCURRENCY_PRIMARY, 10) || 500
const STOP_AT_S = parseInt(process.env.STOP_AT_S, 10) || NOW_IN_S
const WRITE_CONCURRENCY = parseInt(process.env.WRITE_CONCURRENCY, 10) || 10
const LET_USER_DOUBLE_CHECK_INPUTS_FOR =
parseInt(process.env.LET_USER_DOUBLE_CHECK_INPUTS_FOR, 10) || TEN_SECONDS
function getSecondsFromObjectId(id) {
return id.getTimestamp().getTime() / 1000
}
async function main() {
await letUserDoubleCheckInputs()
await waitForDb()
let lowerProjectId = BATCH_LAST_ID
let nProjectsProcessedTotal = 0
let nProjectsWithOrphanedDocsTotal = 0
let nDeletedDocsTotal = 0
while (getSecondsFromObjectId(lowerProjectId) <= STOP_AT_S) {
const upperTime = getSecondsFromObjectId(lowerProjectId) + INCREMENT_BY_S
let upperProjectId = ObjectId.createFromTime(upperTime)
const query = {
project_id: {
// exclude edge
$gt: lowerProjectId,
// include edge
$lte: upperProjectId,
},
}
const docs = await db.docs
.find(query, { readPreference: ReadPreference.SECONDARY })
.project({ project_id: 1 })
.sort({ project_id: 1 })
.limit(BATCH_SIZE)
.toArray()
if (docs.length) {
const projectIds = Array.from(
new Set(docs.map(doc => doc.project_id.toString()))
).map(ObjectId)
console.log('Checking projects', JSON.stringify(projectIds))
const { nProjectsWithOrphanedDocs, nDeletedDocs } = await processBatch(
projectIds
)
nProjectsProcessedTotal += projectIds.length
nProjectsWithOrphanedDocsTotal += nProjectsWithOrphanedDocs
nDeletedDocsTotal += nDeletedDocs
if (docs.length === BATCH_SIZE) {
// This project may have more than BATCH_SIZE docs.
const lastDoc = docs[docs.length - 1]
// Resume from after this projectId.
upperProjectId = lastDoc.project_id
}
}
console.error(
'Processed %d projects ' +
'(%d projects with orphaned docs/%d docs deleted) ' +
'until %s',
nProjectsProcessedTotal,
nProjectsWithOrphanedDocsTotal,
nDeletedDocsTotal,
upperProjectId
)
lowerProjectId = upperProjectId
}
}
async function getDeletedProject(projectId, readPreference) {
return await db.deletedProjects.findOne(
{ 'deleterData.deletedProjectId': projectId },
{
// There is no index on .project. Pull down something small.
projection: { 'project._id': 1 },
readPreference,
}
)
}
async function getProject(projectId, readPreference) {
return await db.projects.findOne(
{ _id: projectId },
{
// Pulling down an empty object is fine for differentiating with null.
projection: { _id: 0 },
readPreference,
}
)
}
async function getProjectDocs(projectId) {
return await db.docs
.find(
{ project_id: projectId },
{
projection: { _id: 1 },
readPreference: ReadPreference.PRIMARY,
}
)
.toArray()
}
async function checkProjectExistsWithReadPreference(projectId, readPreference) {
// NOTE: Possible race conditions!
// There are two processes which are racing with our queries:
// 1. project deletion
// 2. project restoring
// For 1. we check the projects collection before deletedProjects.
// If a project were to be delete in this very moment, we should see the
// soft-deleted entry which is created before deleting the projects entry.
// For 2. we check the projects collection after deletedProjects again.
// If a project were to be restored in this very moment, it is very likely
// to see the projects entry again.
// Unlikely edge case: Restore+Deletion in rapid succession.
// We could add locking to the ProjectDeleter for ruling ^ out.
if (await getProject(projectId, readPreference)) {
// The project is live.
return true
}
const deletedProject = await getDeletedProject(projectId, readPreference)
if (deletedProject && deletedProject.project) {
// The project is registered for hard-deletion.
return true
}
if (await getProject(projectId, readPreference)) {
// The project was just restored.
return true
}
// The project does not exist.
return false
}
async function checkProjectExistsOnPrimary(projectId) {
return await checkProjectExistsWithReadPreference(
projectId,
ReadPreference.PRIMARY
)
}
async function checkProjectExistsOnSecondary(projectId) {
return await checkProjectExistsWithReadPreference(
projectId,
ReadPreference.SECONDARY
)
}
async function processBatch(projectIds) {
const doubleCheckProjectIdsOnPrimary = []
let nDeletedDocs = 0
async function checkProjectOnSecondary(projectId) {
if (await checkProjectExistsOnSecondary(projectId)) {
// Finding a project with secondary confidence is sufficient.
return
}
// At this point, the secondaries deem this project as having orphaned docs.
doubleCheckProjectIdsOnPrimary.push(projectId)
}
const projectsWithOrphanedDocs = []
async function checkProjectOnPrimary(projectId) {
if (await checkProjectExistsOnPrimary(projectId)) {
// The project is actually live.
return
}
projectsWithOrphanedDocs.push(projectId)
const docs = await getProjectDocs(projectId)
nDeletedDocs += docs.length
console.log(
'Deleted project %s has %s orphaned docs: %s',
projectId,
docs.length,
JSON.stringify(docs.map(doc => doc._id))
)
}
await promiseMapWithLimit(
READ_CONCURRENCY_SECONDARY,
projectIds,
checkProjectOnSecondary
)
await promiseMapWithLimit(
READ_CONCURRENCY_PRIMARY,
doubleCheckProjectIdsOnPrimary,
checkProjectOnPrimary
)
if (!DRY_RUN) {
await promiseMapWithLimit(
WRITE_CONCURRENCY,
projectsWithOrphanedDocs,
DocstoreManager.promises.destroyProject
)
}
const nProjectsWithOrphanedDocs = projectsWithOrphanedDocs.length
return { nProjectsWithOrphanedDocs, nDeletedDocs }
}
async function letUserDoubleCheckInputs() {
console.error(
'Options:',
JSON.stringify(
{
BATCH_LAST_ID,
BATCH_SIZE,
DRY_RUN,
INCREMENT_BY_S,
STOP_AT_S,
READ_CONCURRENCY_SECONDARY,
READ_CONCURRENCY_PRIMARY,
WRITE_CONCURRENCY,
LET_USER_DOUBLE_CHECK_INPUTS_FOR,
},
null,
2
)
)
console.error(
'Waiting for you to double check inputs for',
LET_USER_DOUBLE_CHECK_INPUTS_FOR,
'ms'
)
await sleep(LET_USER_DOUBLE_CHECK_INPUTS_FOR)
}
main()
.then(() => {
console.error('Done.')
process.exit(0)
})
.catch(error => {
console.error({ error })
process.exit(1)
})