Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions azure-pipeline-templates/huge-list-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ steps:
env:
mount_dir: ${{ parameters.mount_dir }}

- script: grep "OUTGOING REQUEST" blobfuse2-logs.txt | wc -l
displayName: 'HugeList: ${{ parameters.idstring }} Request Count'
continueOnError: true

- script: |
cat blobfuse2-logs.txt
displayName: 'View Logs'
Expand Down
224 changes: 156 additions & 68 deletions component/azstorage/block_blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,10 @@ func (bb *BlockBlob) Configure(cfg AzStorageConfig) error {
}

bb.listDetails = container.ListBlobsInclude{
Metadata: true,
Deleted: false,
Snapshots: false,
Metadata: true,
Deleted: false,
Snapshots: false,
Permissions: false, //Added to get permissions, acl, group, owner for HNS accounts
}

return nil
Expand Down Expand Up @@ -457,6 +458,8 @@ func (bb *BlockBlob) getAttrUsingRest(name string) (attr *internal.ObjAttr, err

parseMetadata(attr, prop.Metadata)

attr.Flags.Set(internal.PropFlagMetadataRetrieved)
// We do not get permissions as part of this getAttr call hence setting the flag to true
attr.Flags.Set(internal.PropFlagModeDefault)

return attr, nil
Expand Down Expand Up @@ -534,16 +537,11 @@ func (bb *BlockBlob) List(prefix string, marker *string, count int32) ([]*intern
}
}(marker))

blobList := make([]*internal.ObjAttr, 0)

if count == 0 {
count = common.MaxDirListCount
}

listPath := filepath.Join(bb.Config.prefixPath, prefix)
if (prefix != "" && prefix[len(prefix)-1] == '/') || (prefix == "" && bb.Config.prefixPath != "") {
listPath += "/"
}
listPath := bb.getListPath(prefix)

// Get a result segment starting with the blob indicated by the current Marker.
pager := bb.Container.NewListBlobsHierarchyPager("/", &container.ListBlobsHierarchyOptions{
Expand All @@ -562,46 +560,47 @@ func (bb *BlockBlob) List(prefix string, marker *string, count int32) ([]*intern

if err != nil {
log.Err("BlockBlob::List : Failed to list the container with the prefix %s", err.Error)
return blobList, nil, err
}

dereferenceTime := func(input *time.Time, defaultTime time.Time) time.Time {
if input == nil {
return defaultTime
} else {
return *input
}
return nil, nil, err
}

// Process the blobs returned in this result segment (if the segment is empty, the loop body won't execute)
// Since block blob does not support acls, we set mode to 0 and FlagModeDefault to true so the fuse layer can return the default permission.

blobList, dirList, err := bb.processBlobItems(listBlob.Segment.BlobItems)
if err != nil {
return nil, nil, err
}

// In case virtual directory exists but its corresponding 0 byte marker file is not there holding hdi_isfolder then just iterating
// over BlobItems will fail to identify that directory. In such cases BlobPrefixes help to list all directories
// dirList contains all dirs for which we got 0 byte meta file in this iteration, so exclude those and add rest to the list
// Note: Since listing is paginated, sometimes the marker file may come in a different iteration from the BlobPrefix. For such
// cases we manually call GetAttr to check the existence of the marker file.
err = bb.processBlobPrefixes(listBlob.Segment.BlobPrefixes, dirList, &blobList)
if err != nil {
return nil, nil, err
}

return blobList, listBlob.NextMarker, nil
}

func (bb *BlockBlob) getListPath(prefix string) string {
listPath := filepath.Join(bb.Config.prefixPath, prefix)
if (prefix != "" && prefix[len(prefix)-1] == '/') || (prefix == "" && bb.Config.prefixPath != "") {
listPath += "/"
}
return listPath
}

func (bb *BlockBlob) processBlobItems(blobItems []*container.BlobItem) ([]*internal.ObjAttr, map[string]bool, error) {
blobList := make([]*internal.ObjAttr, 0)
// For some directories 0 byte meta file may not exists so just create a map to figure out such directories
var dirList = make(map[string]bool)
for _, blobInfo := range listBlob.Segment.BlobItems {
attr := &internal.ObjAttr{}
if blobInfo.Properties.CustomerProvidedKeySHA256 != nil && *blobInfo.Properties.CustomerProvidedKeySHA256 != "" {
log.Trace("BlockBlob::List : blob is encrypted with customer provided key so fetching metadata explicitly using REST")
attr, err = bb.getAttrUsingRest(*blobInfo.Name)
if err != nil {
log.Err("BlockBlob::List : Failed to get properties of blob %s", *blobInfo.Name)
return blobList, nil, err
}
} else {
attr = &internal.ObjAttr{
Path: split(bb.Config.prefixPath, *blobInfo.Name),
Name: filepath.Base(*blobInfo.Name),
Size: *blobInfo.Properties.ContentLength,
Mode: 0,
Mtime: *blobInfo.Properties.LastModified,
Atime: dereferenceTime(blobInfo.Properties.LastAccessedOn, *blobInfo.Properties.LastModified),
Ctime: *blobInfo.Properties.LastModified,
Crtime: dereferenceTime(blobInfo.Properties.CreationTime, *blobInfo.Properties.LastModified),
Flags: internal.NewFileBitMap(),
MD5: blobInfo.Properties.ContentMD5,
}
parseMetadata(attr, blobInfo.Metadata)
attr.Flags.Set(internal.PropFlagModeDefault)
dirList := make(map[string]bool)

for _, blobInfo := range blobItems {
attr, err := bb.getBlobAttr(blobInfo)
if err != nil {
return nil, nil, err
}
blobList = append(blobList, attr)

Expand All @@ -612,35 +611,77 @@ func (bb *BlockBlob) List(prefix string, marker *string, count int32) ([]*intern
}
}

// In case virtual directory exists but its corresponding 0 byte marker file is not there holding hdi_isfolder then just iterating
// over BlobItems will fail to identify that directory. In such cases BlobPrefixes help to list all directories
// dirList contains all dirs for which we got 0 byte meta file in this iteration, so exclude those and add rest to the list
// Note: Since listing is paginated, sometimes the marker file may come in a different iteration from the BlobPrefix. For such
// cases we manually call GetAttr to check the existence of the marker file.
for _, blobInfo := range listBlob.Segment.BlobPrefixes {
return blobList, dirList, nil
}

func (bb *BlockBlob) getBlobAttr(blobInfo *container.BlobItem) (*internal.ObjAttr, error) {
if blobInfo.Properties.CustomerProvidedKeySHA256 != nil && *blobInfo.Properties.CustomerProvidedKeySHA256 != "" {
log.Trace("BlockBlob::List : blob is encrypted with customer provided key so fetching metadata explicitly using REST")
return bb.getAttrUsingRest(*blobInfo.Name)
}
mode, err := bb.getFileMode(blobInfo.Properties.Permissions)
if err != nil {
mode = 0
log.Warn("BlockBlob::getBlobAttr : Failed to get file mode for %s [%s]", *blobInfo.Name, err.Error())
}

attr := &internal.ObjAttr{
Path: split(bb.Config.prefixPath, *blobInfo.Name),
Name: filepath.Base(*blobInfo.Name),
Size: *blobInfo.Properties.ContentLength,
Mode: mode,
Mtime: *blobInfo.Properties.LastModified,
Atime: bb.dereferenceTime(blobInfo.Properties.LastAccessedOn, *blobInfo.Properties.LastModified),
Ctime: *blobInfo.Properties.LastModified,
Crtime: bb.dereferenceTime(blobInfo.Properties.CreationTime, *blobInfo.Properties.LastModified),
Flags: internal.NewFileBitMap(),
MD5: blobInfo.Properties.ContentMD5,
}

parseMetadata(attr, blobInfo.Metadata)
if !bb.listDetails.Permissions {
// In case of HNS account do not set this flag
attr.Flags.Set(internal.PropFlagModeDefault)
}

return attr, nil
}

func (bb *BlockBlob) getFileMode(permissions *string) (os.FileMode, error) {
if permissions == nil {
return 0, nil
}
return getFileMode(*permissions)
}

func (bb *BlockBlob) dereferenceTime(input *time.Time, defaultTime time.Time) time.Time {
if input == nil {
return defaultTime
}
return *input
}

func (bb *BlockBlob) processBlobPrefixes(blobPrefixes []*container.BlobPrefix, dirList map[string]bool, blobList *[]*internal.ObjAttr) error {
for _, blobInfo := range blobPrefixes {
if _, ok := dirList[*blobInfo.Name]; ok {
// marker file found in current iteration, skip adding the directory
continue
} else {
// marker file not found in current iteration, so we need to manually check attributes via REST
_, err := bb.getAttrUsingRest(*blobInfo.Name)
// marker file also not found via manual check, safe to add to list
if err == syscall.ENOENT {
// For these dirs we get only the name and no other properties so hardcoding time to current time
name := strings.TrimSuffix(*blobInfo.Name, "/")
attr := &internal.ObjAttr{
Path: split(bb.Config.prefixPath, name),
Name: filepath.Base(name),
Size: 4096,
Mode: os.ModeDir,
Mtime: time.Now(),
Flags: internal.NewDirBitMap(),
//Check to see if its a HNS account and we received properties in blob prefixes
if bb.listDetails.Permissions {
attr, err := bb.createDirAttrWithPermissions(blobInfo)
if err != nil {
return err
}
*blobList = append(*blobList, attr)
} else {
// marker file not found in current iteration, so we need to manually check attributes via REST
_, err := bb.getAttrUsingRest(*blobInfo.Name)
// marker file also not found via manual check, safe to add to list
if err == syscall.ENOENT {
attr := bb.createDirAttr(*blobInfo.Name)
*blobList = append(*blobList, attr)
}
attr.Atime = attr.Mtime
attr.Crtime = attr.Mtime
attr.Ctime = attr.Mtime
attr.Flags.Set(internal.PropFlagModeDefault)
blobList = append(blobList, attr)
}
}
}
Expand All @@ -650,7 +691,54 @@ func (bb *BlockBlob) List(prefix string, marker *string, count int32) ([]*intern
delete(dirList, k)
}

return blobList, listBlob.NextMarker, nil
return nil
}

func (bb *BlockBlob) createDirAttr(name string) *internal.ObjAttr {
// For these dirs we get only the name and no other properties so hardcoding time to current time
name = strings.TrimSuffix(name, "/")
attr := &internal.ObjAttr{
Path: split(bb.Config.prefixPath, name),
Name: filepath.Base(name),
Size: 4096,
Mode: os.ModeDir,
Mtime: time.Now(),
Flags: internal.NewDirBitMap(),
}
attr.Atime = attr.Mtime
attr.Crtime = attr.Mtime
attr.Ctime = attr.Mtime

// This is called only in case of FNS when blobPrefix is there but the marker does not exists
attr.Flags.Set(internal.PropFlagModeDefault)
return attr
}

func (bb *BlockBlob) createDirAttrWithPermissions(blobInfo *container.BlobPrefix) (*internal.ObjAttr, error) {
if blobInfo.Properties == nil {
return nil, fmt.Errorf("failed to get properties of blobprefix %s", *blobInfo.Name)
}

mode, err := bb.getFileMode(blobInfo.Properties.Permissions)
if err != nil {
mode = 0
log.Warn("BlockBlob::createDirAttrWithPermissions : Failed to get file mode for %s [%s]", *blobInfo.Name, err.Error())
}

name := strings.TrimSuffix(*blobInfo.Name, "/")
attr := &internal.ObjAttr{
Path: split(bb.Config.prefixPath, name),
Name: filepath.Base(name),
Size: *blobInfo.Properties.ContentLength,
Mode: mode,
Mtime: *blobInfo.Properties.LastModified,
Atime: bb.dereferenceTime(blobInfo.Properties.LastAccessedOn, *blobInfo.Properties.LastModified),
Ctime: *blobInfo.Properties.LastModified,
Crtime: bb.dereferenceTime(blobInfo.Properties.CreationTime, *blobInfo.Properties.LastModified),
Flags: internal.NewDirBitMap(),
}

return attr, nil
}

// track the progress of download of blobs where every 100MB of data downloaded is being tracked. It also tracks the completion of download
Expand Down
41 changes: 41 additions & 0 deletions component/azstorage/block_blob_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3452,6 +3452,47 @@ func (suite *blockBlobTestSuite) UtilityFunctionTruncateFileToLarger(size int, t

}

func (s *blockBlobTestSuite) TestList() {
defer s.cleanupTest()
// Setup
s.tearDownTestHelper(false) // Don't delete the generated container.
config := fmt.Sprintf("azstorage:\n account-name: %s\n endpoint: https://%s.dfs.core.windows.net/\n type: block\n account-key: %s\n mode: key\n container: %s\n fail-unsupported-op: true",
storageTestConfigurationParameters.BlockAccount, storageTestConfigurationParameters.BlockAccount, storageTestConfigurationParameters.BlockKey, s.container)
s.setupTestHelper(config, s.container, true)

base := generateDirectoryName()
s.setupHierarchy(base)

blobList, marker, err := s.az.storage.List("", nil, 0)
s.assert.Nil(err)
emptyString := ""
s.assert.Equal(&emptyString, marker)
s.assert.NotNil(blobList)
s.assert.EqualValues(3, len(blobList))

// Test listing with prefix
blobList, marker, err = s.az.storage.List(base+"b/", nil, 0)
s.assert.Nil(err)
s.assert.Equal(&emptyString, marker)
s.assert.NotNil(blobList)
s.assert.EqualValues(1, len(blobList))
s.assert.EqualValues("c1", blobList[0].Name)

// Test listing with marker
blobList, marker, err = s.az.storage.List(base, to.Ptr("invalid-marker"), 0)
s.assert.NotNil(err)
s.assert.Equal(0, len(blobList))
s.assert.Nil(marker)

// Test listing with count
blobList, marker, err = s.az.storage.List("", nil, 1)
s.assert.Nil(err)
s.assert.NotNil(blobList)
s.assert.NotEmpty(marker)
s.assert.EqualValues(1, len(blobList))
s.assert.EqualValues(base, blobList[0].Path)
}

// In order for 'go test' to run this suite, we need to create
// a normal test function and pass our suite to suite.Run
func TestBlockBlob(t *testing.T) {
Expand Down
Loading