Skip to content

Commit

Permalink
[BREAKING] [FIX] ObjectStore implementations prior to this one used a…
Browse files Browse the repository at this point in the history
… sha256 library that generated incorrect digests for files 512MB and larger. These digests prevent other tools from accessing objects `put` by the JavaScript ObjectStore implementation. Included in the fix is a migration tool that walks the specified objectstore and calculates hashes, if found to be a match for the bad-hashes, it updates the hash in the metadata entry for the object.

Signed-off-by: Alberto Ricart <[email protected]>
  • Loading branch information
aricart committed Feb 7, 2025
1 parent 2f81ec9 commit edc3dbe
Show file tree
Hide file tree
Showing 4 changed files with 573 additions and 18 deletions.
159 changes: 159 additions & 0 deletions bin/fix-os-hashes.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
/*
* Copyright 2023 The NATS Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { parse } from "https://deno.land/[email protected]/flags/mod.ts";
import { ObjectStoreImpl, ServerObjectInfo } from "../jetstream/objectstore.ts";
import {
Base64UrlPaddedCodec,
connect,
ConnectionOptions,
credsAuthenticator,
} from "https://raw.githubusercontent.com/nats-io/nats.deno/main/src/mod.ts";
import {
SHA256 as BAD_SHA256,
} from "https://raw.githubusercontent.com/nats-io/nats.deno/refs/tags/v1.29.1/nats-base-client/sha256.js";
import { consumerOpts } from "../jetstream/internal_mod.ts";
import { sha256 } from "../nats-base-client/js-sha256.js";

const argv = parse(
Deno.args,
{
alias: {
"s": ["server"],
"f": ["creds"],
"b": ["bucket"],
},
default: {
s: "127.0.0.1:4222",
c: 1,
i: 0,
},
boolean: ["check"],
string: ["server", "creds", "bucket"],
},
);

const copts = { servers: argv.s } as ConnectionOptions;

if (argv.h || argv.help) {
console.log(
"Usage: fix-os [-s server] [--creds=/path/file.creds] [--check] --bucket=name",
);
console.log(
"\nThis tool fixes metadata entries in an object store that were written",
);
console.log(
"with recalculated hashes. Please backup your object stores",
);
console.log("before using this tool.");

Deno.exit(1);
}

if (argv.creds) {
const data = await Deno.readFile(argv.creds);
copts.authenticator = credsAuthenticator(data);
}

if (!argv.bucket) {
console.log("--bucket is required");
Deno.exit(1);
}

const nc = await connect(copts);

const js = nc.jetstream();
const jsm = await nc.jetstreamManager();
const lister = jsm.streams.listObjectStores();
let found = false;
const streamName = `OBJ_${argv.bucket}`;
for await (const oss of lister) {
if (oss.streamInfo.config.name === streamName) {
found = true;
break;
}
}
if (!found) {
console.log(`bucket '${argv.bucket}' was not found`);
Deno.exit(1);
}
const os = await js.views.os(argv.bucket) as ObjectStoreImpl;
await fixDigests(os);

async function fixDigests(os: ObjectStoreImpl): Promise<void> {
let fixes = 0;
const entries = await os.list();
for (const entry of entries) {
const badSha = new BAD_SHA256();
const sha = sha256.create();

const oc = consumerOpts();
const subj = `$O.${os.name}.C.${entry.nuid}`;
let needsFixing = false;

const sub = await js.subscribe(subj, oc);
for await (const m of sub) {
if (m.data.length > 0) {
badSha.update(m.data);
sha.update(m.data);
}
if (m.info.pending === 0) {
const hash = sha.digest();
const badHash = badSha.digest();
for (let i = 0; i < hash.length; i++) {
if (hash[i] !== badHash[i]) {
needsFixing = true;
fixes++;
break;
}
}
break;
}
}
sub.unsubscribe();
if (argv.check) {
continue;
}
if (needsFixing) {
const metaSubject = os._metaSubject(entry.name);
const m = await os.jsm.streams.getMessage(os.stream, {
last_by_subj: metaSubject,
});
const info = m.json<ServerObjectInfo>();
const digest = Base64UrlPaddedCodec.encode(sha.digest());
info.digest = `SHA-256=${digest}`;
try {
await js.publish(metaSubject, JSON.stringify(info));
} catch (err) {
console.error(`[ERR] failed to update ${metaSubject}: ${err.message}`);
continue;
}
try {
const seq = m.seq;
await jsm.streams.deleteMessage(os.stream, seq);
} catch (err) {
console.error(
`[WARN] failed to delete bad entry ${metaSubject}: ${err.message} - new entry was added`,
);
}
}
}

const verb = argv.check ? "are" : "were";
console.log(
`${fixes} digest fixes ${verb} required on bucket ${argv.bucket}`,
);
}

await nc.drain();
19 changes: 7 additions & 12 deletions jetstream/objectstore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import {
PubAck,
} from "./types.ts";
import { QueuedIteratorImpl } from "../nats-base-client/queued_iterator.ts";
import { SHA256 } from "../nats-base-client/sha256.js";
import { sha256 } from "../nats-base-client/js-sha256.js";

import {
MsgHdrs,
Expand Down Expand Up @@ -357,7 +357,7 @@ export class ObjectStoreImpl implements ObjectStore {
const db = new DataBuffer();
try {
const reader = rs ? rs.getReader() : null;
const sha = new SHA256();
const sha = sha256.create();

while (true) {
const { done, value } = reader
Expand All @@ -378,10 +378,8 @@ export class ObjectStoreImpl implements ObjectStore {

// prepare the metadata
info.mtime = new Date().toISOString();
const digest = sha.digest("base64");
const pad = digest.length % 3;
const padding = pad > 0 ? "=".repeat(pad) : "";
info.digest = `${digestType}${digest}${padding}`;
const digest = Base64UrlPaddedCodec.encode(sha.digest());
info.digest = `${digestType}${digest}`;
info.deleted = false;

// trailing md for the object
Expand Down Expand Up @@ -543,7 +541,7 @@ export class ObjectStoreImpl implements ObjectStore {

const oc = consumerOpts();
oc.orderedConsumer();
const sha = new SHA256();
const sha = sha256.create();
const subj = `$O.${this.name}.C.${info.nuid}`;
const sub = await this.js.subscribe(subj, oc);
(async () => {
Expand All @@ -553,11 +551,8 @@ export class ObjectStoreImpl implements ObjectStore {
controller!.enqueue(jm.data);
}
if (jm.info.pending === 0) {
const hash = sha.digest("base64");
// go pads the hash - which should be multiple of 3 - otherwise pads with '='
const pad = hash.length % 3;
const padding = pad > 0 ? "=".repeat(pad) : "";
const digest = `${digestType}${hash}${padding}`;
const hash = Base64UrlPaddedCodec.encode(sha.digest());
const digest = `${digestType}${hash}`;
if (digest !== info.digest) {
controller!.error(
new Error(
Expand Down
10 changes: 4 additions & 6 deletions jetstream/tests/objectstore_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import { crypto } from "https://deno.land/[email protected]/crypto/mod.ts";
import { ObjectInfo, ObjectStoreMeta, StorageType } from "../mod.ts";
import { connect, Empty, headers, nanos, StringCodec } from "../../src/mod.ts";
import { equals } from "https://deno.land/[email protected]/bytes/mod.ts";
import { SHA256 } from "../../nats-base-client/sha256.js";
import { sha256 } from "../../nats-base-client/js-sha256.js";
import { Base64UrlPaddedCodec } from "../../nats-base-client/base64.ts";
import { digestType } from "../objectstore.ts";

Expand Down Expand Up @@ -80,12 +80,10 @@ function makeData(n: number): Uint8Array {
}

function digest(data: Uint8Array): string {
const sha = new SHA256();
const sha = sha256.create();
sha.update(data);
const digest = sha.digest("base64");
const pad = digest.length % 3;
const padding = pad > 0 ? "=".repeat(pad) : "";
return `${digestType}${digest}${padding}`;
const digest = Base64UrlPaddedCodec.encode(sha.digest());
return `${digestType}${digest}`;
}

Deno.test("objectstore - basics", async () => {
Expand Down
Loading

0 comments on commit edc3dbe

Please sign in to comment.