Skip to content

Commit

Permalink
Merge pull request #745 from nats-io/fix-sha256
Browse files Browse the repository at this point in the history
[BREAKING] [FIX] replace sha256 library used by the client for objectstore entries and migration tool
  • Loading branch information
aricart authored Feb 7, 2025
2 parents 2f81ec9 + d1cfea9 commit 7624f0e
Show file tree
Hide file tree
Showing 9 changed files with 723 additions and 384 deletions.
189 changes: 189 additions & 0 deletions bin/fix-os-hashes.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
/*
* Copyright 2025 The NATS Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { parse } from "https://deno.land/[email protected]/flags/mod.ts";
import { ObjectStoreImpl, ServerObjectInfo } from "../jetstream/objectstore.ts";
import {
connect,
ConnectionOptions,
credsAuthenticator,
} from "https://raw.githubusercontent.com/nats-io/nats.deno/main/src/mod.ts";
import { Base64UrlPaddedCodec } from "../nats-base-client/base64.ts";
import {
SHA256 as BAD_SHA256,
} from "https://raw.githubusercontent.com/nats-io/nats.deno/refs/tags/v1.29.1/nats-base-client/sha256.js";
import { consumerOpts } from "../jetstream/mod.ts";
import { sha256 } from "https://raw.githubusercontent.com/nats-io/nats.deno/refs/tags/v1.29.1/nats-base-client/sha256.js";
import { checkSha256, parseSha256 } from "../jetstream/sha_digest.parser.ts";

const argv = parse(
Deno.args,
{
alias: {
"s": ["server"],
"f": ["creds"],
"b": ["bucket"],
},
default: {
s: "127.0.0.1:4222",
c: 1,
i: 0,
},
boolean: ["check"],
string: ["server", "creds", "bucket"],
},
);

const copts = { servers: argv.s } as ConnectionOptions;

if (argv.h || argv.help) {
console.log(
"Usage: fix-os [-s server] [--creds=/path/file.creds] [--check] --bucket=name",
);
console.log(
"\nThis tool fixes metadata entries in an object store that were written",
);
console.log(
"with hashes that were calculated incorrectly due to a bug in the sha256 library.",
);
console.log("Please backup your object stores before using this tool.");

Deno.exit(1);
}

if (argv.creds) {
const data = await Deno.readFile(argv.creds);
copts.authenticator = credsAuthenticator(data);
}

if (!argv.bucket) {
console.log("--bucket is required");
Deno.exit(1);
}

const nc = await connect(copts);

const js = nc.jetstream();
const jsm = await nc.jetstreamManager();
const lister = jsm.streams.listObjectStores();
let found = false;
const streamName = `OBJ_${argv.bucket}`;
for await (const oss of lister) {
if (oss.streamInfo.config.name === streamName) {
found = true;
break;
}
}
if (!found) {
console.log(`bucket '${argv.bucket}' was not found`);
Deno.exit(1);
}
const os = await js.views.os(argv.bucket) as ObjectStoreImpl;
await fixDigests(os);

async function fixDigests(os: ObjectStoreImpl): Promise<void> {
let fixes = 0;
const entries = await os.list();
for (const entry of entries) {
if (!entry.digest.startsWith("SHA-256=")) {
console.error(
`ignoring entry ${entry.name} - unknown objectstore digest:`,
entry.digest,
);
continue;
}
// plain digest string
const digest = entry.digest.substring(8);
const parsedDigest = parseSha256(digest);
if (parsedDigest === null) {
console.error(
`ignoring entry ${entry.name} - unable to parse digest:`,
digest,
);
continue;
}

const badSha = new BAD_SHA256();
const sha = sha256.create();
let badHash = new Uint8Array(0);
let hash = new Uint8Array(0);

const oc = consumerOpts();
oc.orderedConsumer();

const subj = `$O.${os.name}.C.${entry.nuid}`;
let needsFixing = false;

const sub = await js.subscribe(subj, oc);
for await (const m of sub) {
if (m.data.length > 0) {
badSha.update(m.data);
sha.update(m.data);
}
if (m.info.pending === 0) {
badHash = badSha.digest();
hash = sha.digest();
break;
}
}
sub.unsubscribe();

if (checkSha256(parsedDigest, badHash)) {
// this one could be bad
if (!checkSha256(badHash, hash)) {
console.log(
`[WARN] entry ${entry.name} has a bad hash: ${
Base64UrlPaddedCodec.encode(badHash)
} - should be ${Base64UrlPaddedCodec.encode(hash)}`,
);
needsFixing = true;
fixes++;
}
}

if (argv.check) {
continue;
}
if (needsFixing) {
const metaSubject = os._metaSubject(entry.name);
const m = await os.jsm.streams.getMessage(os.stream, {
last_by_subj: metaSubject,
});
const info = m.json<ServerObjectInfo>();
const digest = Base64UrlPaddedCodec.encode(hash);
info.digest = `SHA-256=${digest}`;
try {
await js.publish(metaSubject, JSON.stringify(info));
} catch (err) {
console.error(`[ERR] failed to update ${metaSubject}: ${err.message}`);
continue;
}
try {
const seq = m.seq;
await jsm.streams.deleteMessage(os.stream, seq);
} catch (err) {
console.error(
`[WARN] failed to delete bad entry ${metaSubject}: ${err.message} - new entry was added`,
);
}
}
}

const verb = argv.check ? "are" : "were";
console.log(
`${fixes} digest fixes ${verb} required on bucket ${argv.bucket}`,
);
}

await nc.drain();
32 changes: 18 additions & 14 deletions jetstream/objectstore.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022-2023 The NATS Authors
* Copyright 2022-2025 The NATS Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
Expand Down Expand Up @@ -36,7 +36,7 @@ import {
PubAck,
} from "./types.ts";
import { QueuedIteratorImpl } from "../nats-base-client/queued_iterator.ts";
import { SHA256 } from "../nats-base-client/sha256.js";
import { sha256 } from "../nats-base-client/js-sha256.js";

import {
MsgHdrs,
Expand All @@ -55,6 +55,7 @@ import {
} from "./jsapi_types.ts";
import { JsMsg } from "./jsmsg.ts";
import { PubHeaders } from "./jsclient.ts";
import { checkSha256, parseSha256 } from "./sha_digest.parser.ts";

export const osPrefix = "OBJ_";
export const digestType = "SHA-256=";
Expand Down Expand Up @@ -357,7 +358,7 @@ export class ObjectStoreImpl implements ObjectStore {
const db = new DataBuffer();
try {
const reader = rs ? rs.getReader() : null;
const sha = new SHA256();
const sha = sha256.create();

while (true) {
const { done, value } = reader
Expand All @@ -378,10 +379,8 @@ export class ObjectStoreImpl implements ObjectStore {

// prepare the metadata
info.mtime = new Date().toISOString();
const digest = sha.digest("base64");
const pad = digest.length % 3;
const padding = pad > 0 ? "=".repeat(pad) : "";
info.digest = `${digestType}${digest}${padding}`;
const digest = Base64UrlPaddedCodec.encode(sha.digest());
info.digest = `${digestType}${digest}`;
info.deleted = false;

// trailing md for the object
Expand Down Expand Up @@ -527,6 +526,16 @@ export class ObjectStoreImpl implements ObjectStore {
return os.get(ln);
}

if (!info.digest.startsWith(digestType)) {
return Promise.reject(new Error(`unknown digest type: ${info.digest}`));
}
const digest = parseSha256(info.digest.substring(8));
if (digest === null) {
return Promise.reject(
new Error(`unable to parse digest: ${info.digest}`),
);
}

const d = deferred<Error | null>();

const r: Partial<ObjectResult> = {
Expand All @@ -543,7 +552,7 @@ export class ObjectStoreImpl implements ObjectStore {

const oc = consumerOpts();
oc.orderedConsumer();
const sha = new SHA256();
const sha = sha256.create();
const subj = `$O.${this.name}.C.${info.nuid}`;
const sub = await this.js.subscribe(subj, oc);
(async () => {
Expand All @@ -553,12 +562,7 @@ export class ObjectStoreImpl implements ObjectStore {
controller!.enqueue(jm.data);
}
if (jm.info.pending === 0) {
const hash = sha.digest("base64");
// go pads the hash - which should be multiple of 3 - otherwise pads with '='
const pad = hash.length % 3;
const padding = pad > 0 ? "=".repeat(pad) : "";
const digest = `${digestType}${hash}${padding}`;
if (digest !== info.digest) {
if (!checkSha256(digest, sha.digest())) {
controller!.error(
new Error(
`received a corrupt object, digests do not match received: ${info.digest} calculated ${digest}`,
Expand Down
104 changes: 104 additions & 0 deletions jetstream/sha_digest.parser.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright 2025 The NATS Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

export function parseSha256(s: string): Uint8Array | null {
return toByteArray(s);
}

function isHex(s: string): boolean {
// contains valid hex characters only
const hexRegex = /^[0-9A-Fa-f]+$/;
if (!hexRegex.test(s)) {
// non-hex characters
return false;
}

// check for mixed-case strings - paranoid base64 sneaked in
const isAllUpperCase = /^[0-9A-F]+$/.test(s);
const isAllLowerCase = /^[0-9a-f]+$/.test(s);
if (!(isAllUpperCase || isAllLowerCase)) {
return false;
}

// ensure the input string length is even
return s.length % 2 === 0;
}

function isBase64(s: string): boolean {
// test for padded or normal base64
return /^[A-Za-z0-9\-_]*(={0,2})?$/.test(s) ||
/^[A-Za-z0-9+/]*(={0,2})?$/.test(s);
}

function detectEncoding(input: string): "hex" | "b64" | "" {
// hex is more reliable to flush out...
if (isHex(input)) {
return "hex";
} else if (isBase64(input)) {
return "b64";
}
return "";
}

function hexToByteArray(s: string): Uint8Array {
if (s.length % 2 !== 0) {
throw new Error("hex string must have an even length");
}
const a = new Uint8Array(s.length / 2);
for (let i = 0; i < s.length; i += 2) {
// parse hex two chars at a time
a[i / 2] = parseInt(s.substring(i, i + 2), 16);
}
return a;
}

function base64ToByteArray(s: string): Uint8Array {
// could be url friendly
s = s.replace(/-/g, "+");
s = s.replace(/_/g, "/");
const sbin = atob(s);
return Uint8Array.from(sbin, (c) => c.charCodeAt(0));
}

function toByteArray(input: string): Uint8Array | null {
const encoding = detectEncoding(input);
switch (encoding) {
case "hex":
return hexToByteArray(input);
case "b64":
return base64ToByteArray(input);
}
return null;
}

export function checkSha256(
a: string | Uint8Array,
b: string | Uint8Array,
): boolean {
const aBytes = typeof a === "string" ? parseSha256(a) : a;
const bBytes = typeof b === "string" ? parseSha256(b) : b;
if (aBytes === null || bBytes === null) {
return false;
}
if (aBytes.length !== bBytes.length) {
return false;
}
for (let i = 0; i < aBytes.length; i++) {
if (aBytes[i] !== bBytes[i]) {
return false;
}
}
return true;
}
2 changes: 1 addition & 1 deletion jetstream/tests/consumers_ordered_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -979,7 +979,7 @@ Deno.test("ordered consumers - consume reset", async () => {
const c = await js.consumers.get("A") as OrderedPullConsumerImpl;

// after the first message others will get published
let iter = await c.consume({ max_messages: 11, expires: 5000 });
const iter = await c.consume({ max_messages: 11, expires: 5000 });
countResets(iter).catch();
const sequences = [];
for await (const m of iter) {
Expand Down
2 changes: 1 addition & 1 deletion jetstream/tests/kv_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2166,7 +2166,7 @@ Deno.test("kv - watcher on server restart", async () => {
});

Deno.test("kv - maxBucketSize doesn't override max_bytes", async () => {
let { ns, nc } = await setup(
const { ns, nc } = await setup(
jetstreamServerConf({}),
);
const js = nc.jetstream();
Expand Down
Loading

0 comments on commit 7624f0e

Please sign in to comment.