Skip to content
This repository has been archived by the owner on Apr 20, 2020. It is now read-only.

Commit

Permalink
Merge pull request #127 from burgerdev/OpLabelVolume-optimization
Browse files Browse the repository at this point in the history
OpLabelVolume optimization
  • Loading branch information
burgerdev committed Mar 26, 2014
2 parents 3949e33 + 010985b commit 02a8b2c
Show file tree
Hide file tree
Showing 2 changed files with 144 additions and 131 deletions.
228 changes: 109 additions & 119 deletions lazyflow/operators/opLabelVolume.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@

from threading import Lock as ThreadLock
from functools import partial
from abc import ABCMeta, abstractmethod, abstractproperty
import logging

import numpy as np
import vigra
Expand All @@ -12,14 +14,7 @@
from lazyflow.request import Request, RequestPool
from lazyflow.operators import OpCompressedCache, OpReorderAxes

# try to import the blockedarray module, fail only if neccessary
try:
import blockedarray
except ImportError as e:
_blockedarray_module_available = False
_importMsg = str(e)
else:
_blockedarray_module_available = True
logger = logging.getLogger(__name__)


## OpLabelVolume - the **unified** connected components operator
Expand Down Expand Up @@ -51,15 +46,13 @@ class OpLabelVolume(Operator):
## Labeled volume
# Axistags and shape are the same as on the Input, dtype is an integer
# datatype.
# Note: The output might be cached internally depending on the chosen CC
# method, use the CachedOutput to avoid duplicate caches. (see inner
# operator below for details)
# Note: This output is just an alias for CachedOutput, because all
# implementations use internal caches.
Output = OutputSlot()

## Cached label image
# Access the internal cache. If you were planning to cache the labeled
# volume, be sure to use this slot, since it makes use of the internal
# cache that might be in use anyway.
# Axistags and shape are the same as on the Input, dtype is an integer
# datatype.
CachedOutput = OutputSlot()

name = "OpLabelVolume"
Expand Down Expand Up @@ -105,7 +98,7 @@ def setupOutputs(self):
self._op5_2_cached.Input.connect(self._opLabel.CachedOutput)

# set the final reordering operator's AxisOrder to that of the input
origOrder = "".join([s for s in self.Input.meta.getTaggedShape()])
origOrder = self.Input.meta.getAxisKeys()
self._op5_2.AxisOrder.setValue(origOrder)
self._op5_2_cached.AxisOrder.setValue(origOrder)

Expand Down Expand Up @@ -145,6 +138,8 @@ def _setBG(self):

## parent class for all connected component labeling implementations
class OpLabelingABC(Operator):
__metaclass__ = ABCMeta

## input with axes 'xyzct'
Input = InputSlot()

Expand All @@ -154,6 +149,11 @@ class OpLabelingABC(Operator):
Output = OutputSlot()
CachedOutput = OutputSlot()

## list of supported dtypes
@abstractproperty
def supportedDtypes(self):
pass

def __init__(self, *args, **kwargs):
super(OpLabelingABC, self).__init__(*args, **kwargs)
self._cache = None
Expand All @@ -162,6 +162,15 @@ def __init__(self, *args, **kwargs):
def setupOutputs(self):
labelType = np.uint32

# check if the input dtype is valid
if self.Input.ready():
dtype = self.Input.meta.dtype
if dtype not in self.supportedDtypes:
msg = "{}: dtype '{}' not supported "\
"with method 'vigra'. Supported types: {}"
msg = msg.format(self.name, dtype, self.supportedDtypes)
raise ValueError(msg)

# remove unneeded old cache
if self._cache is not None:
self._cache.Input.disconnect()
Expand All @@ -181,11 +190,9 @@ def setupOutputs(self):
self.Output.meta.assignFrom(self._cache.Output.meta)
self.CachedOutput.meta.assignFrom(self._cache.Output.meta)

# prepare locks for each channel and time slice
s = self.Input.meta.getTaggedShape()
shape = (s['c'], s['t'])
self._cached = np.zeros(shape)

# prepare locks for each channel and time slice
locks = np.empty(shape, dtype=np.object)
for c in range(s['c']):
for t in range(s['t']):
Expand All @@ -204,7 +211,9 @@ def propagateDirty(self, slot, subindex, roi):
self.CachedOutput.setDirty(outroi)

def execute(self, slot, subindex, roi, result):
#FIXME we don't care right now which slot is requested, just return cached CC
#FIXME we don't care right now which slot is requested, just return
# cached CC (all implementations cache anyways)

# get the background values
bg = self.Background[...].wait()
bg = vigra.taggedView(bg, axistags=self.Background.meta.axistags)
Expand All @@ -216,58 +225,86 @@ def execute(self, slot, subindex, roi, result):
# do labeling in parallel over channels and time slices
pool = RequestPool()

## function for request ##
def singleSliceRequest(start3d, stop3d, c, t, bg):
# computing CC is a critical section
self._locks[c, t].acquire()

# update the slice
self._updateCache(start3d, stop3d, c, t, bg)

# leave the critical section
self._locks[c, t].release()
## end function for request ##

for t in range(roi.start[4], roi.stop[4]):
for c in range(roi.start[3], roi.stop[3]):
# only one thread may update the cache for this c and t, other requests
# must wait until labeling is finished
self._locks[c, t].acquire()
if self._cached[c, t]:
# this slice is already computed
continue
# update the whole slice
req = Request(partial(self._updateSlice, c, t, bg[c, t]))
req = Request(partial(singleSliceRequest,
roi.start[:3], roi.stop[:3],
c, t, bg[c, t]))
pool.add(req)

logger.debug("{}: Computing connected components for ROI {}".format(
self.name, roi))
pool.wait()
pool.clean()

req = self._cache.Output.get(roi)
req.writeInto(result)
req.block()

# release locks and set caching flags
for t in range(roi.start[4], roi.stop[4]):
for c in range(roi.start[3], roi.stop[3]):
self._cached[c, t] = 1
self._locks[c, t].release()

## compute the requested slice and put the results into self._cache
## compute the requested roi and put the results into self._cache
#
@abstractmethod
def _updateCache(self, start3d, stop3d, c, t, bg):
pass


class OpNonLazyCC(OpLabelingABC):

def setupOutputs(self):
if self.Input.ready():
s = self.Input.meta.getTaggedShape()
shape = (s['c'], s['t'])
self._cached = np.zeros(shape, dtype=np.bool)
super(OpNonLazyCC, self).setupOutputs()

## wraps the childrens' updateSlice function to check if recomputation is
## needed
def _updateCache(self, start3d, stop3d, c, t, bg):
# we compute the whole slice, regardless of actual roi, if needed
if self._cached[c, t]:
return
self._updateSlice(c, t, bg)
self._cached[c, t] = True

@abstractmethod
def _updateSlice(self, c, t, bg):
raise NotImplementedError("This is an abstract method")
pass


## vigra connected components
class _OpLabelVigra(OpLabelingABC):
class _OpLabelVigra(OpNonLazyCC):
name = "OpLabelVigra"
supportedDtypes = [np.uint8, np.uint32, np.float32]

def setupOutputs(self):
if self.Input.ready():
supportedDtypes = [np.uint8, np.uint32, np.float32]
dtype = self.Input.meta.dtype
if dtype not in supportedDtypes:
msg = "OpLabelVolume: dtype '{}' not supported "\
"with method 'vigra'. Supported types: {}"
msg = msg.format(dtype, supportedDtypes)
raise ValueError(msg)
super(_OpLabelVigra, self).setupOutputs()
def propagateDirty(self, slot, subindex, roi):
# set the cache to dirty
self._cached[roi.start[3]:roi.stop[3],
roi.start[4]:roi.stop[4]] = 0
super(_OpLabelVigra, self).propagateDirty(slot, subindex, roi)

def _updateSlice(self, c, t, bg):
source = vigra.taggedView(self.Input[..., c, t].wait(),
axistags='xyzct')
source = source.withAxes(*'xyz')
result = vigra.analysis.labelVolumeWithBackground(
source, background_value=int(bg))
if source.shape[2] > 1:
result = vigra.analysis.labelVolumeWithBackground(
source, background_value=int(bg))
else:
result = vigra.analysis.labelImageWithBackground(
source[..., 0], background_value=int(bg))
result = result.withAxes(*'xyzct')

stop = np.asarray(self.Input.meta.shape)
Expand All @@ -279,22 +316,37 @@ def _updateSlice(self, c, t, bg):
self._cache.setInSlot(self._cache.Input, (), roi, result)


## blockedarray connected components
class _OpLabelBlocked(OpLabelingABC):
# try to import the blockedarray module, fail only if neccessary
try:
from blockedarray import OpBlockedConnectedComponents
except ImportError as e:
_blockedarray_module_available = False
_importMsg = str(e)
class OpBlockedConnectedComponents(object):
pass
else:
_blockedarray_module_available = True
_importMsg = "No error, importing blockedarray worked."

def haveBlocked():
return _blockedarray_module_available


## Wrapper for blockedarray.OpBlockedConnectedComponents
# This wrapper takes care that the module is indeed imported, and sets the
# block shape for the cache.
class _OpLabelBlocked(OpBlockedConnectedComponents):
name = "OpLabelBlocked"

def _updateSlice(self, c, t, bg):
assert _blockedarray_module_available,\
"Failed to import blockedarray: {}".format(_importMsg)
assert bg == 0,\
"Blocked Labeling not implemented for background value {}".format(bg)
"Failed to import blockedarray. Message was: {}".format(_importMsg)

blockShape = _findBlockShape(self.Input.meta.shape[:3])
blockShape = _findBlockShape(self.Input.meta.shape[:3]) + (1, 1)
logger.debug("{}: Using blockshape {}".format(self.name, blockShape))
self._cache.BlockShape.setValue(blockShape)
super(_OpLabelBlocked, self)._updateSlice(c, t, bg)

source = _Source(self.Input, blockShape, c, t)
sink = _Sink(self._cache, c, t)
cc = blockedarray.dim3.ConnectedComponents(source, blockShape)
cc.writeToSink(sink)


## Feeds meta data into OpCompressedCache
Expand All @@ -320,68 +372,6 @@ def setMeta(self, meta):
self.Output.meta.assignFrom(meta)


if _blockedarray_module_available:

def fullRoiFromPQ(slot, p, q, c, t):
fullp = np.zeros((5,), dtype=np.int)
fullq = np.zeros((5,), dtype=np.int)
fullp[:3] = p
fullq[:3] = q
fullp[3] = c
fullp[4] = t
fullq[3] = c+1
fullq[4] = t+1
return SubRegion(slot, start=fullp, stop=fullq)

class _Source(blockedarray.adapters.SourceABC):
def __init__(self, slot, blockShape, c, t):
super(_Source, self).__init__()
self._slot = slot
self._blockShape = blockShape # is passed to blockedarray!
self._p = np.asarray(slot.meta.shape[:3], dtype=np.long)*0
self._q = np.asarray(slot.meta.shape[:3], dtype=np.long)
self._c = c
self._t = t

def pySetRoi(self, roi):
assert len(roi) == 2
self._p = np.asarray(roi[0], dtype=np.long)
self._q = np.asarray(roi[1], dtype=np.long)

def pyShape(self):
return tuple(self._slot.meta.shape[:3])

def pyReadBlock(self, roi, output):
assert len(roi) == 2
roiP = np.asarray(roi[0])
roiQ = np.asarray(roi[1])
p = self._p + roiP
q = p + roiQ - roiP
if np.any(q > self._q):
raise IndexError("Requested roi is too large for selected "
"roi (previous call to setRoi)")
sub = fullRoiFromPQ(self._slot, p, q, self._c, self._t)
req = self._slot.get(sub)
req.writeInto(output[..., np.newaxis, np.newaxis])
req.block()
return True

class _Sink(blockedarray.adapters.SinkABC):
def __init__(self, op, c, t):
super(_Sink, self).__init__()
self._op = op
self._c = c
self._t = t

def pyWriteBlock(self, roi, block):
assert len(roi) == 2
roiP = np.asarray(roi[0])
roiQ = np.asarray(roi[1])
sub = fullRoiFromPQ(self._op.Input, roiP, roiQ, self._c, self._t)
self._op.setInSlot(self._op.Input, (), sub,
block[..., np.newaxis, np.newaxis])


## find a good block shape for given input shape
# Set the block shape such that it
# (a) divides the volume shape evenly
Expand Down
Loading

0 comments on commit 02a8b2c

Please sign in to comment.