diff --git a/enumerator.go b/enumerator.go index 972a224..c90bc87 100644 --- a/enumerator.go +++ b/enumerator.go @@ -14,125 +14,21 @@ package zap -import ( - "bytes" +import "github.com/blevesearch/vellum" - "github.com/blevesearch/vellum" -) +var maxItrsPerBasicEnum = 64 -// enumerator provides an ordered traversal of multiple vellum -// iterators. Like JOIN of iterators, the enumerator produces a -// sequence of (key, iteratorIndex, value) tuples, sorted by key ASC, -// then iteratorIndex ASC, where the same key might be seen or -// repeated across multiple child iterators. -type enumerator struct { - itrs []vellum.Iterator - currKs [][]byte - currVs []uint64 - - lowK []byte - lowIdxs []int - lowCurr int -} - -// newEnumerator returns a new enumerator over the vellum Iterators -func newEnumerator(itrs []vellum.Iterator) (*enumerator, error) { - rv := &enumerator{ - itrs: itrs, - currKs: make([][]byte, len(itrs)), - currVs: make([]uint64, len(itrs)), - lowIdxs: make([]int, 0, len(itrs)), - } - for i, itr := range rv.itrs { - rv.currKs[i], rv.currVs[i] = itr.Current() - } - rv.updateMatches(false) - if rv.lowK == nil && len(rv.lowIdxs) == 0 { - return rv, vellum.ErrIteratorDone - } - return rv, nil -} - -// updateMatches maintains the low key matches based on the currKs -func (m *enumerator) updateMatches(skipEmptyKey bool) { - m.lowK = nil - m.lowIdxs = m.lowIdxs[:0] - m.lowCurr = 0 - - for i, key := range m.currKs { - if (key == nil && m.currVs[i] == 0) || // in case of empty iterator - (len(key) == 0 && skipEmptyKey) { // skip empty keys - continue - } - - cmp := bytes.Compare(key, m.lowK) - if cmp < 0 || len(m.lowIdxs) == 0 { - // reached a new low - m.lowK = key - m.lowIdxs = m.lowIdxs[:0] - m.lowIdxs = append(m.lowIdxs, i) - } else if cmp == 0 { - m.lowIdxs = append(m.lowIdxs, i) - } - } -} - -// Current returns the enumerator's current key, iterator-index, and -// value. If the enumerator is not pointing at a valid value (because -// Next returned an error previously), Current will return nil,0,0. -func (m *enumerator) Current() ([]byte, int, uint64) { - var i int - var v uint64 - if m.lowCurr < len(m.lowIdxs) { - i = m.lowIdxs[m.lowCurr] - v = m.currVs[i] - } - return m.lowK, i, v -} - -// GetLowIdxsAndValues will return all of the iterator indices -// which point to the current key, and their corresponding -// values. This can be used by advanced caller which may need -// to peek into these other sets of data before processing. -func (m *enumerator) GetLowIdxsAndValues() ([]int, []uint64) { - values := make([]uint64, 0, len(m.lowIdxs)) - for _, idx := range m.lowIdxs { - values = append(values, m.currVs[idx]) - } - return m.lowIdxs, values -} - -// Next advances the enumerator to the next key/iterator/value result, -// else vellum.ErrIteratorDone is returned. -func (m *enumerator) Next() error { - m.lowCurr += 1 - if m.lowCurr >= len(m.lowIdxs) { - // move all the current low iterators forwards - for _, vi := range m.lowIdxs { - err := m.itrs[vi].Next() - if err != nil && err != vellum.ErrIteratorDone { - return err - } - m.currKs[vi], m.currVs[vi] = m.itrs[vi].Current() - } - // can skip any empty keys encountered at this point - m.updateMatches(true) - } - if m.lowK == nil && len(m.lowIdxs) == 0 { - return vellum.ErrIteratorDone - } - return nil +type enumerator interface { + Current() ([]byte, int, uint64) + GetLowIdxsAndValues() ([]int, []uint64) + Next() error + Close() error } -// Close all the underlying Iterators. The first error, if any, will -// be returned. -func (m *enumerator) Close() error { - var rv error - for _, itr := range m.itrs { - err := itr.Close() - if rv == nil { - rv = err - } +func newEnumerator(itrs []vellum.Iterator) (enumerator, error) { + if len(itrs) < maxItrsPerBasicEnum { + return newBasicEnumerator(itrs, 0) + } else { + return newMergeEnumerator(itrs) } - return rv } diff --git a/enumerator_basic.go b/enumerator_basic.go new file mode 100644 index 0000000..e8c8597 --- /dev/null +++ b/enumerator_basic.go @@ -0,0 +1,140 @@ +// Copyright (c) 2024 Couchbase, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package zap + +import ( + "bytes" + + "github.com/blevesearch/vellum" +) + +// enumerator provides an ordered traversal of multiple vellum +// iterators. Like JOIN of iterators, the enumerator produces a +// sequence of (key, iteratorIndex, value) tuples, sorted by key ASC, +// then iteratorIndex ASC, where the same key might be seen or +// repeated across multiple child iterators. +type basicEnumerator struct { + itrs []vellum.Iterator + currKs [][]byte + currVs []uint64 + id int + + lowK []byte + lowIdxs []int + lowCurr int +} + +// newEnumerator returns a new enumerator over the vellum Iterators +func newBasicEnumerator(itrs []vellum.Iterator, id int) (enumerator, error) { + rv := &basicEnumerator{ + itrs: itrs, + currKs: make([][]byte, len(itrs)), + currVs: make([]uint64, len(itrs)), + lowIdxs: make([]int, 0, len(itrs)), + id: id, + } + for i, itr := range rv.itrs { + rv.currKs[i], rv.currVs[i] = itr.Current() + } + rv.updateMatches(false) + if rv.lowK == nil && len(rv.lowIdxs) == 0 { + return rv, vellum.ErrIteratorDone + } + return rv, nil +} + +// updateMatches maintains the low key matches based on the currKs +func (m *basicEnumerator) updateMatches(skipEmptyKey bool) { + m.lowK = nil + m.lowIdxs = m.lowIdxs[:0] + m.lowCurr = 0 + + for i, key := range m.currKs { + if (key == nil && m.currVs[i] == 0) || // in case of empty iterator + (len(key) == 0 && skipEmptyKey) { // skip empty keys + continue + } + + cmp := bytes.Compare(key, m.lowK) + if cmp < 0 || len(m.lowIdxs) == 0 { + // reached a new low + m.lowK = key + m.lowIdxs = m.lowIdxs[:0] + m.lowIdxs = append(m.lowIdxs, i+m.id*maxItrsPerBasicEnum) + } else if cmp == 0 { + m.lowIdxs = append(m.lowIdxs, i+m.id*maxItrsPerBasicEnum) + } + } +} + +// Current returns the enumerator's current key, iterator-index, and +// value. If the enumerator is not pointing at a valid value (because +// Next returned an error previously), Current will return nil,0,0. +func (m *basicEnumerator) Current() ([]byte, int, uint64) { + var i int + var v uint64 + if m.lowCurr < len(m.lowIdxs) { + i = m.lowIdxs[m.lowCurr] + v = m.currVs[i%maxItrsPerBasicEnum] + } + return m.lowK, i, v +} + +// GetLowIdxsAndValues will return all of the iterator indices +// which point to the current key, and their corresponding +// values. This can be used by advanced caller which may need +// to peek into these other sets of data before processing. +func (m *basicEnumerator) GetLowIdxsAndValues() ([]int, []uint64) { + values := make([]uint64, 0, len(m.lowIdxs)) + for _, idx := range m.lowIdxs { + values = append(values, m.currVs[idx%maxItrsPerBasicEnum]) + } + return m.lowIdxs, values +} + +// Next advances the enumerator to the next key/iterator/value result, +// else vellum.ErrIteratorDone is returned. +func (m *basicEnumerator) Next() error { + m.lowCurr += 1 + if m.lowCurr >= len(m.lowIdxs) { + // move all the current low iterators forwards + for _, vi := range m.lowIdxs { + err := m.itrs[vi%maxItrsPerBasicEnum].Next() + if err != nil && err != vellum.ErrIteratorDone { + return err + } + m.currKs[vi%maxItrsPerBasicEnum], m.currVs[vi%maxItrsPerBasicEnum] = m.itrs[vi%maxItrsPerBasicEnum].Current() + } + // can skip any empty keys encountered at this point + m.updateMatches(true) + } + if m.lowK == nil && len(m.lowIdxs) == 0 { + return vellum.ErrIteratorDone + } + return nil +} + +// Close all the underlying Iterators. The first error, if any, will +// be returned. +func (m *basicEnumerator) Close() error { + var rv error + for _, itr := range m.itrs { + err := itr.Close() + if rv == nil { + rv = err + } + } + return rv +} diff --git a/enumerator_merge.go b/enumerator_merge.go new file mode 100644 index 0000000..be05edb --- /dev/null +++ b/enumerator_merge.go @@ -0,0 +1,156 @@ +// Copyright (c) 2024 Couchbase, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package zap + +import ( + "bytes" + + "github.com/blevesearch/vellum" +) + +type mergeEnumerator struct { + enums []enumerator + enumChecks []bool + + lowK []byte + lowIdxs []int + lowCurr int + + values []uint64 +} + +func newMergeEnumerator(itrs []vellum.Iterator) (enumerator, error) { + + numEnums := len(itrs)/maxItrsPerBasicEnum + 1 + rv := &mergeEnumerator{ + enums: make([]enumerator, numEnums), + enumChecks: make([]bool, numEnums), + lowIdxs: make([]int, 0, len(itrs)), + values: make([]uint64, len(itrs)), + } + + var err error + var start, end int + for i := 0; i < numEnums; i++ { + start = i * maxItrsPerBasicEnum + end = (i + 1) * maxItrsPerBasicEnum + if end > len(itrs) { + end = len(itrs) + } + rv.enums[i], err = newBasicEnumerator(itrs[start:end], i) + if err != nil { + return nil, err + } + } + + rv.updateMatches(false) + if rv.lowK == nil && len(rv.lowIdxs) == 0 { + return rv, vellum.ErrIteratorDone + } + return rv, nil +} + +func (m *mergeEnumerator) updateMatches(skipEmptyKey bool) { + m.lowK = nil + m.lowIdxs = m.lowIdxs[:0] + m.lowCurr = 0 + var lowIdxs []int + + for _, enum := range m.enums { + key, _, v := enum.Current() + if (key == nil && v == 0) || (len(key) == 0 && skipEmptyKey) { + continue + } + cmp := bytes.Compare(key, m.lowK) + if cmp <= 0 || len(m.lowIdxs) == 0 { + lowIdxs, _ = enum.GetLowIdxsAndValues() + } + if cmp < 0 || len(m.lowIdxs) == 0 { + m.lowK = key + m.lowIdxs = m.lowIdxs[:0] + m.lowIdxs = append(m.lowIdxs, lowIdxs...) + } else if cmp == 0 { + m.lowIdxs = append(m.lowIdxs, lowIdxs...) + } + } +} + +func (m *mergeEnumerator) Current() ([]byte, int, uint64) { + var i, j int + if m.lowCurr < len(m.lowIdxs) { + i = m.lowIdxs[m.lowCurr] + j = i / maxItrsPerBasicEnum + return m.enums[j].Current() + } + return nil, 0, 0 +} + +func (m *mergeEnumerator) resetEnumChecks() { + for i := range m.enumChecks { + m.enumChecks[i] = false + } +} + +func (m *mergeEnumerator) GetLowIdxsAndValues() ([]int, []uint64) { + m.values = m.values[:0] + m.resetEnumChecks() + + for _, idx := range m.lowIdxs { + i := idx / maxItrsPerBasicEnum + + if !m.enumChecks[i] { + _, values := m.enums[i].GetLowIdxsAndValues() + m.values = append(m.values, values...) + m.enumChecks[i] = true + } + } + return m.lowIdxs, m.values +} + +func (m *mergeEnumerator) Next() error { + m.lowCurr += 1 + if m.lowCurr >= len(m.lowIdxs) { + m.resetEnumChecks() + for _, idx := range m.lowIdxs { + i := idx / maxItrsPerBasicEnum + m.enumChecks[i] = true + } + + for i, update := range m.enumChecks { + if update { + err := m.enums[i].Next() + if err != nil && err != vellum.ErrIteratorDone { + return err + } + } + } + m.updateMatches(true) + } + if m.lowK == nil && len(m.lowIdxs) == 0 { + return vellum.ErrIteratorDone + } + return nil +} + +func (m *mergeEnumerator) Close() error { + var rv error + for _, enum := range m.enums { + err := enum.Close() + if rv == nil { + rv = err + } + } + return rv +} diff --git a/section_inverted_text_index.go b/section_inverted_text_index.go index 0862b2e..a038e1a 100644 --- a/section_inverted_text_index.go +++ b/section_inverted_text_index.go @@ -273,9 +273,11 @@ func mergeAndPersistInvertedSection(segments []*SegmentBase, dropsIn []*roaring. return nil, 0, err } // close the enumerator to free the underlying iterators - err = enumerator.Close() - if err != nil { - return nil, 0, err + if enumerator != nil { + err = enumerator.Close() + if err != nil { + return nil, 0, err + } } err = finishTerm(prevTerm)