"""
Many of the major data structures to support the new applier parallel
architecture.
"""
import os
import socket
from multiprocessing.managers import BaseManager
from concurrent import futures
import threading
import secrets
import time
import contextlib
import queue
import tempfile
import traceback
import numpy
from osgeo import gdal
try:
import cloudpickle
except ImportError:
cloudpickle = None
from . import rioserrors
CW_NONE = "CW_NONE"
CW_THREADS = "CW_THREADS"
CW_PBS = "CW_PBS"
CW_SLURM = "CW_SLURM"
CW_AWSBATCH = "CW_AWSBATCH"
CW_SUBPROC = "CW_SUBPROC"
[docs]class ConcurrencyStyle:
"""
Class to hold all parameters associated with the different styles
of concurrency.
By default, concurrency is switched off. Concurrency can be
switched on in two areas - reading, and computation.
The writing of output blocks is always done one block at a time,
because GDAL does not support write concurrency, but it can
overlap with the reading and/or computation of later blocks.
Concurrency in computation is supported at the block level. Each
block of data is computed by a single call to the user function,
and these calls can be distributed to a number of compute workers,
with a choice of different distributed paradigms.
Concurrency in computation is likely to be of benefit only when
the user function is carrying out a computationally intensive
task. If the computation is very simple and fast, then I/O is likely
to be the main bottleneck, and compute concurrency will provide
little or no speedup.
Concurrency in reading is supported in two ways. Firstly,
a pool of threads can read single blocks of data from individual
files, which are placed in a buffer, ready for use in computation. This
allows for reading multiple input files at the same time, and for
reading ahead of the computation. Secondly, each compute worker can
do its own reading (assuming that the input files are accessible
to the compute workers), and the data is consumed directly by each
worker. If desired, these two read paradigms can be used together,
allowing each compute worker to run its own pool of read threads.
Concurrency in reading is mainly beneficial when the input data is
on a device which scales well with high load, such as a cluster
disk array. There is even more benefit when the device also has high
latency, such as an AWS S3 bucket. If input data is on a single local
disk, then adding a single read worker will allow reading to overlap
with computation, but more read workers are unlikely to improve on the
caching and buffering already provided by a sensible operating system.
Note that not all possible combinations of parameters are supported,
and some combinations make no sense at all.
Read Concurrency
numReadWorkers: int
The number of read workers. A value of 0 means that reading
happens sequentially within the main processing loop. A value
greater than zero will start this many independent threads
within the main RIOS process to read blocks of data, placing
them in a buffer, ready for the computation to use them. This
can be used independently of whether any compute workers are
used.
Also see computeWorkersRead, below, for the interaction with
compute workers.
Compute Concurrency
computeWorkerKind: One of {CW_NONE, CW_THREADS, CW_PBS, CW_SLURM,
CW_AWSBATCH, CW_SUBPROC}.
Selects the paradigm used to distribute compute workers.
The CW_THREADS option means a pool of compute threads
running within the same process as the rest of RIOS. This is
almost certainly the best option to start exploring compute
concurrency in RIOS.
The CW_PBS, CW_SLURM and CW_AWSBATCH options all refer to different
batch queue systems, so that compute workers can run as jobs
on the batch queue. In those cases, not only do the workers
run as separate processes, they may also be running on separate
machines. All these options are currently somewhat experimental,
and should be treated with caution.
A value of CW_NONE means that computation happens sequentially
within the main processing loop.
See the `Concurrency <concurrency.html>`_ doc page
for a deeper discussion on suitable use of the different
kinds of compute worker.
numComputeWorkers: int
The number of distinct compute workers. If zero, then
computation happens within the main processing loop.
computeWorkersRead: bool
If True, then each compute worker does its own reading,
possibly with its own pool of read worker threads
(<numReadWorkers> threads for each compute worker). This
is likely to be a good option when used with the batch
queue oriented compute workers, with workers running on
separate machines.
If False, then all reading is done by the main RIOS process
(possibly using one or more read workers) and data is sent
directly to each compute worker. False is required for CW_THREADS
compute workers, but may also be useful in cases when batch
queue nodes are on an internal network, but input files are
not accessible to the batch nodes, and must be read by a process
on the gateway machine.
singleBlockComputeWorkers: bool
This applies only to the batch queue paradigms. In some
batch configurations, it is advantageous to run many small,
quick jobs. If singleBlockComputeWorkers is True, then
numComputeWorkers is ignored, and a batch job is generated
for every block to be processed. It is then up to the batch
queue system's own load balancing to decide how many jobs are
running concurrently. This is likely to be of most benefit
for large shared PBS and SLURM batch queues with plenty of
available nodes. However, it should be used with caution, with
regard to the timeouts which could occur (see below).
haveSharedTemp: bool
If True, then the compute workers are all able to see a shared
temporary directory. This is ignored for some computeWorkerKinds,
but for the PBS and SLURM kinds, the temp dir is used to share a
small text file (only readable by the user) giving the network
address for all other communication. If False, then the address
information is passed on the command line of the batch jobs,
which is publicly visible and so less secure.
Buffering Timeouts (seconds)
The block buffers have several timeout periods defined, with default
values. These can be over-ridden here. Mostly these timeouts should
not be reached, but it is vital to have them. In the event of errors
in one or more workers, whatever is waiting for that worker to respond
would otherwise wait forever, with no explanation. The times given
are all in terms of the wait for a single block of data to become
available. For very slow input devices or very long computations,
they may need to be increased, but generally, if a timeout occurs,
one should first rule out any errors in the relevant workers before
increasing the timeout period.
These timeout values can each be set to None, in which case the
corresponding wait will never timeout.
readBufferInsertTimeout: int
Time to wait to insert a new (empty) block into the read buffer
readBufferPopTimeout: int
Time to wait to pop a complete block out of the read buffer,
for a compute worker to use
computeBufferInsertTimeout: int
Time to wait to insert a completed block into the compute buffer,
ready for the writing thread
computeBufferPopTimeout: int
Time to wait to pop a block out of the compute buffer, to
write it to the outfiles
computeBarrierTimeout: int
This applies only to the batch-oriented compute worker types, and
only when singleBlockComputeWorkers is False. For any other styles
it is ignored. Processing is blocked until all batch compute workers
have had a chance to start, after which everything proceeds. The
wait at this barrier will timeout after this many seconds.
"""
def __init__(self, numReadWorkers=0, numComputeWorkers=0,
computeWorkerKind=CW_NONE,
computeWorkersRead=False,
singleBlockComputeWorkers=False,
haveSharedTemp=True,
readBufferInsertTimeout=10,
readBufferPopTimeout=10,
computeBufferInsertTimeout=10,
computeBufferPopTimeout=20,
computeBarrierTimeout=600
):
self.numReadWorkers = numReadWorkers
self.numComputeWorkers = numComputeWorkers
self.computeWorkerKind = computeWorkerKind
self.computeWorkersRead = computeWorkersRead
self.singleBlockComputeWorkers = singleBlockComputeWorkers
self.haveSharedTemp = haveSharedTemp
self.readBufferInsertTimeout = readBufferInsertTimeout
self.readBufferPopTimeout = readBufferPopTimeout
self.computeBufferInsertTimeout = computeBufferInsertTimeout
self.computeBufferPopTimeout = computeBufferPopTimeout
self.computeBarrierTimeout = computeBarrierTimeout
# Perform checks for any invalid combinations of parameters
if singleBlockComputeWorkers and numComputeWorkers > 0:
msg = ("numComputeWorkers should not be specified when " +
"singleBlockComputeWorkers is True")
raise ValueError(msg)
if singleBlockComputeWorkers and numReadWorkers > 0:
msg = ("singleBlockComputeWorkers must have " +
"numReadWorkers=0")
raise ValueError(msg)
if (computeWorkersRead and computeWorkerKind == CW_THREADS):
msg = "CW_THREADS compute workers cannot do their own reading"
raise ValueError(msg)
if singleBlockComputeWorkers and (computeWorkerKind == CW_THREADS):
msg = ("CW_THREADS compute workers cannot also be " +
"singleBlockComputeWorkers")
raise ValueError(msg)
if numComputeWorkers > 0 and (computeWorkerKind == CW_NONE):
msg = "Compute workers requested, but no computeWorkerKind given"
raise ValueError(msg)
if numComputeWorkers == 0 and (computeWorkerKind != CW_NONE):
msg = ("Zero compute workers requested, but " +
"computeWorkerKind == {}".format(computeWorkerKind))
raise ValueError(msg)
if ((numComputeWorkers > 0) and (not computeWorkersRead) and
(numReadWorkers == 0)):
msg = ("Multiple non-reading compute workers with " +
"zero read workers is not a sensible choice. Best "
"to make numReadWorkers at least 1")
raise ValueError(msg)
if (computeWorkerKind == CW_AWSBATCH) and singleBlockComputeWorkers:
msg = ("AWS Batch compute workers are not suitable for use " +
"with singleBlockComputeWorkers=True")
raise ValueError(msg)
from multiprocessing import cpu_count
numCpus = cpu_count()
if ((computeWorkerKind == CW_THREADS) and
(numComputeWorkers > numCpus)):
msg = ("Number of CPUs = {}, numComputeWorkers = {}. " +
"For CW_THREADS, it is not sensible to have " +
"numComputeWorkers > numCpus").format(
numCpus, numComputeWorkers)
raise ValueError(msg)
def __repr__(self):
s = ("ConcurrencyStyle(" +
"numReadWorkers={}, ".format(self.numReadWorkers) +
"numComputeWorkers={}, ".format(self.numComputeWorkers) +
"computeWorkerKind={}, ".format(self.computeWorkerKind) +
"computeWorkersRead={}, ".format(self.computeWorkersRead) +
"singleBlockComputeWorkers={}, ".format(
self.singleBlockComputeWorkers) +
"haveSharedTemp={}, ".format(self.haveSharedTemp) +
"readBufferInsertTimeout={}, ".format(
self.readBufferInsertTimeout) +
"readBufferPopTimeout={}, ".format(self.readBufferPopTimeout) +
"computeBufferInsertTimeout={}, ".format(
self.computeBufferInsertTimeout) +
"computeBufferPopTimeout={}, ".format(
self.computeBufferPopTimeout) +
"computeBarrierTimeout={})".format(self.computeBarrierTimeout)
)
return s
[docs]class FilenameAssociations(object):
"""
Class for associating external image filenames with internal
names, which are then the same names used inside a function given
to the :func:`rios.applier.apply` function.
Each attribute created on this object should be a filename, or a
list of filenames. The corresponding attribute names will appear
on the 'inputs' or 'outputs' objects inside the applied function.
Each such attribute will be an image data block or a list of image
data blocks, accordingly.
This object can be used as an iterator. Each iteration will return
a tuple of (symbolicName, sequenceNumber, filename). The symbolicName
is the name for each attribute on the object. If this corresponds
to a single filename, then the sequenceNumber is None. If it is a list,
then the iterator will return each of the files in the list as a new
iteration, with the sequenceNumber being the index in the list. In this
way, a single loop is able to iterate through all of the files defined
on the object, with full information about where they are found.
The object can also be indexed, using a tuple of
(symbolicName, sequenceNumber) as an index. The value at that index
is the corresponding filename. If seqNum is None, or the index is just
the symbolicName string instead of a tuple, then the index operation
returns the full entry for that symbolicName, which may be either a
single filename or a list of filenames.
Indexing is read-only, and cannot be used to set filenames.
"""
def __getitem__(self, key):
if isinstance(key, tuple):
(symbolicName, seqNum) = key
elif isinstance(key, str):
symbolicName = key
seqNum = None
else:
symbolicName = None
if symbolicName in self.__dict__:
entry = self.__dict__[symbolicName]
if isinstance(entry, str):
if seqNum is None:
value = entry
else:
raise KeyError(key)
elif isinstance(entry, list):
if seqNum is not None and seqNum < len(entry):
value = entry[seqNum]
elif seqNum is None:
value = entry
else:
raise KeyError(key)
else:
msg = "Invalid entry for name '{}'".format(symbolicName)
raise ValueError(msg)
else:
raise KeyError(key)
return value
def __contains__(self, key):
try:
self[key]
isIn = True
except KeyError:
isIn = False
return isIn
def __iter__(self):
return FilenameAssocIterator(self)
[docs]class FilenameAssocIterator(object):
"""
Separate class for the iterator of a FilenameAssociations object.
Using a separate class allows us to maintain the iteration state here,
without putting these extra variables onto the original FilenameAssociations
object. Not sure how important this is, but it is an approach originally
suggested in the old Python docs. In later Python versions, this approach is
no longer explicitly suggested in the docs, but seems to have some merit,
so we still do it this way.
When one iterates a FilenameAssociations object, the loop is actually
iterating on a new instance of this class, and the original object is thus
left untouched.
"""
def __init__(self, infiles):
self.fullList = []
for symbolicName in infiles.__dict__:
entry = infiles.__dict__[symbolicName]
if isinstance(entry, str):
seqNum = None
self.fullList.append((symbolicName, seqNum, entry))
elif isinstance(entry, list):
for i in range(len(entry)):
self.fullList.append((symbolicName, i, entry[i]))
self.currentNdx = 0
def __next__(self):
if self.currentNdx < len(self.fullList):
retVal = self.fullList[self.currentNdx]
self.currentNdx += 1
return retVal
else:
raise StopIteration()
def __iter__(self):
return self
[docs]class BlockAssociations:
"""
Container class to hold raster arrays for a single block.
If the constructor is given a FilenameAssociations object,
it populates the BlockAssociations object with None to match
the same structure of names and sequences. Otherwise the object
is empty.
This object can be indexed, using a tuple of
(symbolicName, sequenceNumber) as a key. This can be used for
both getting and setting an array of data on the object. If the
symbolicName corresponds to a list, then the sequenceNumber should
be an integer index value, but if the symbolicName corresponds to
a single filename, then the sequenceNumber should be None.
In order to set a value via indexing, the object must have been
created from a corresponding FilenameAssociations object, which
determines the structure of the object (i.e. the valid names and
index values).
"""
def __init__(self, fnameAssoc=None):
if fnameAssoc is not None:
for (name, val) in fnameAssoc.__dict__.items():
if isinstance(val, list):
self.__dict__[name] = [None] * len(val)
else:
self.__dict__[name] = None
def __setitem__(self, key, value):
if isinstance(key, tuple):
(symbolicName, seqNum) = key
elif isinstance(key, str):
symbolicName = key
seqNum = None
else:
symbolicName = None
if symbolicName in self.__dict__:
entry = self.__dict__[symbolicName]
if isinstance(entry, list):
if seqNum is not None and seqNum < len(entry):
entry[seqNum] = value
else:
raise KeyError(key)
else:
self.__dict__[symbolicName] = value
else:
if seqNum is None:
self.__dict__[symbolicName] = value
def __getitem__(self, key):
if isinstance(key, tuple):
(symbolicName, seqNum) = key
elif isinstance(key, str):
symbolicName = key
seqNum = None
else:
symbolicName = None
if symbolicName in self.__dict__:
entry = self.__dict__[symbolicName]
if isinstance(entry, list):
if seqNum is not None and seqNum < len(entry):
value = entry[seqNum]
else:
raise KeyError(key)
else:
value = entry
else:
raise KeyError(key)
return value
def __len__(self):
count = 0
for symbolicName in self.__dict__:
entry = self.__dict__[symbolicName]
if isinstance(entry, list):
count += len(entry)
else:
count += 1
return count
[docs]class BlockBuffer:
"""
Buffer of blocks of data which have been read in. Blocks
may exist but be incomplete, as individual inputs are
added to them. This structure is shared by all read workers
within a given process, so includes locking mechanisms to
make it thread-safe.
"""
def __init__(self, filenameAssoc, numWorkers,
insertTimeout, popTimeout, bufferTypeName):
self.BUFFERMAX = 2 * numWorkers
self.lock = threading.Lock()
self.buffer = {}
self.completionEvents = {}
self.insertTimeout = insertTimeout
self.popTimeout = popTimeout
self.bufferTypeName = bufferTypeName
self.nextBlockQ = queue.Queue()
self.numBlocksPopped = 0
# This semaphore counts backwards for the number of blocks
# currently in the buffer. A semaphore value of zero would
# mean the buffer is full
self.bufferCount = threading.BoundedSemaphore(self.BUFFERMAX)
# Save the filenameAssoc so we can replicate its structure
self.filenameAssoc = filenameAssoc
[docs] def waitCompletion(self, blockDefn, timeout=None):
"""
Wait until the given block is complete
"""
key = blockDefn
with self.lock:
if key not in self.completionEvents:
self.completionEvents[key] = threading.Event()
blockCompleted = self.completionEvents[key].wait(timeout=timeout)
return blockCompleted
[docs] def addBlockData(self, blockDefn, name, seqNum, arr):
"""
Use when building up blocks one array at a time
"""
# Acquire (i.e. decrement) this semaphore, in case we are about
# to add a whole new block
acquired = self.bufferCount.acquire(timeout=self.insertTimeout)
if not acquired:
msg = "Timeout acquiring access to BlockBuffer."
timeoutName = self.timeoutName("Insert")
msg += ("\n Try increasing {} (current value = {})\n").format(
timeoutName, self.insertTimeout)
raise rioserrors.TimeoutError(msg)
with self.lock:
if blockDefn not in self.buffer:
self.buffer[blockDefn] = BlockBufferValue(
filenameAssoc=self.filenameAssoc)
else:
# We are not adding a new block, so release (increment)
# the semaphore back again
self.bufferCount.release()
self.buffer[blockDefn].addData(name, seqNum, arr)
if blockDefn not in self.completionEvents:
self.completionEvents[blockDefn] = threading.Event()
if self.buffer[blockDefn].complete():
self.completionEvents[blockDefn].set()
self.nextBlockQ.put(blockDefn)
[docs] def insertCompleteBlock(self, blockDefn, blockData):
"""
Use when inserting a complete BlockAssociations object at once
"""
acquired = self.bufferCount.acquire(self.insertTimeout)
if not acquired:
msg = "Timeout acquiring access to BlockBuffer."
timeoutName = self.timeoutName("Insert")
msg += ("\n Try increasing {} (current value = {})\n").format(
timeoutName, self.insertTimeout)
raise rioserrors.TimeoutError(msg)
with self.lock:
if blockDefn in self.buffer:
# We did not actually add a new entry, so increment
# the semaphore
self.bufferCount.release()
val = BlockBufferValue(blockData=blockData)
self.buffer[blockDefn] = val
if blockDefn not in self.completionEvents:
self.completionEvents[blockDefn] = threading.Event()
self.completionEvents[blockDefn].set()
self.nextBlockQ.put(blockDefn)
[docs] def timeoutName(self, timeoutType):
"""
Deduce the name of the relevant timeout, using the
bufferTypeName given to the constructor, and the type of timeout
"""
name = "{}Buffer{}Timeout".format(self.bufferTypeName, timeoutType)
return name
[docs] def popCompleteBlock(self, blockDefn):
"""
Returns the BlockAssociations object for the given blockDefn,
and removes it from the buffer
"""
completed = self.waitCompletion(blockDefn, timeout=self.popTimeout)
if completed:
with self.lock:
blockData = self.buffer[blockDefn].blockData
# Now remove this block from the buffer
self.buffer.pop(blockDefn)
self.completionEvents.pop(blockDefn)
# One less block in the buffer, so increment the semaphore
self.bufferCount.release()
# Record how many blocks have been successfully popped. This
# is mainly used during error reporting.
self.numBlocksPopped += 1
else:
blockData = None
return blockData
[docs] def popNextBlock(self):
"""
Pop the next completed block from the buffer, without regard to
which block it is. Return a tuple of objects
(ApplierBlockDefn, BlockAssociations)
"""
try:
nextBlock = self.nextBlockQ.get(timeout=self.popTimeout)
timedout = False
except queue.Empty:
timedout = True
if timedout:
msg = ("BlockBuffer timeout. Number of blocks " +
"already popped: {}").format(self.numBlocksPopped)
timeoutName = self.timeoutName("Pop")
msg += ("\n Try increasing {} (current value = {})\n").format(
timeoutName, self.popTimeout)
raise rioserrors.TimeoutError(msg)
blockData = self.popCompleteBlock(nextBlock)
return (nextBlock, blockData)
[docs]class BlockBufferValue:
"""
Used to hold a BlockAssociations object, along with relevant information
about its completeness, and locking to ensure thread-safety. An instance
of this is used for each BlockAssociations object stored in a BlockBuffer.
"""
def __init__(self, filenameAssoc=None, blockData=None):
if filenameAssoc is not None:
self.blockData = BlockAssociations(filenameAssoc)
self.numMissing = len(self.blockData)
elif blockData is not None:
self.blockData = blockData
self.numMissing = 0
self.lock = threading.Lock()
[docs] def complete(self):
return (self.numMissing == 0)
[docs] def addData(self, name, seqNum, arr):
with self.lock:
self.blockData[name, seqNum] = arr
self.numMissing -= 1
[docs]class ApplierBlockDefn:
"""
Defines a single block of the working grid. Is hashable and ordered.
"""
def __init__(self, top, left, nrows, ncols):
self.top = top
self.left = left
self.nrows = nrows
self.ncols = ncols
# Define __hash__ and __eq__ so we can use these objects as
# dictionary keys
def __hash__(self):
return hash((self.top, self.left, self.nrows, self.ncols))
def __eq__(self, other):
thisID = (self.top, self.left, self.nrows, self.ncols)
otherID = (other.top, other.left, other.nrows, other.ncols)
return (thisID == otherID)
def __lt__(self, other):
thisID = (self.top, self.left, self.nrows, self.ncols)
otherID = (other.top, other.left, other.nrows, other.ncols)
return (thisID < otherID)
def __gt__(self, other):
thisID = (self.top, self.left, self.nrows, self.ncols)
otherID = (other.top, other.left, other.nrows, other.ncols)
return (thisID > otherID)
def __le__(self, other):
thisID = (self.top, self.left, self.nrows, self.ncols)
otherID = (other.top, other.left, other.nrows, other.ncols)
return (thisID <= otherID)
def __ge__(self, other):
thisID = (self.top, self.left, self.nrows, self.ncols)
otherID = (other.top, other.left, other.nrows, other.ncols)
return (thisID >= otherID)
def __repr__(self):
return 'ApplierBlockDefn({}, {}, {}, {})'.format(self.top,
self.left, self.nrows, self.ncols)
[docs]class Timers:
"""
Manage multiple named timers. See interval() method for example
usage.
Maintains a dictionary of pairs of start/finish times, before and
after particular operations. These are grouped by operation names,
and for each name, a list is accumulated of the pairs, for every
time when this operation was carried out.
The object is thread-safe, so multiple threads can accumulate to
the same names.
"""
def __init__(self, pairs=None, withlock=True):
if pairs is None:
self.pairs = {}
else:
self.pairs = pairs
if withlock:
self.lock = threading.Lock()
else:
self.lock = None
[docs] @contextlib.contextmanager
def interval(self, intervalName):
"""
Use as a context manager to time a particular named interval.
Example::
timings = Timers()
with timings.interval('some_action'):
# Code block required to perform the action
After exit from the `with` statement, the timings object will have
accumulated the start and end times around the code block. These
will then contribute to the reporting of time intervals.
"""
startTime = time.time()
yield
endTime = time.time()
with self.lock:
if intervalName not in self.pairs:
self.pairs[intervalName] = []
self.pairs[intervalName].append((startTime, endTime))
[docs] def getDurationsForName(self, intervalName):
if intervalName in self.pairs:
intervals = [(p[1] - p[0]) for p in self.pairs[intervalName]]
else:
intervals = None
return intervals
[docs] def merge(self, other):
"""
Merge another Timers object into this one
"""
with self.lock:
for intervalName in other.pairs:
if intervalName in self.pairs:
self.pairs[intervalName].extend(other.pairs[intervalName])
else:
self.pairs[intervalName] = other.pairs[intervalName]
[docs] def makeSummaryDict(self):
"""
Make some summary statistics, and return them in a dictionary
"""
d = {}
for name in self.pairs:
intervals = numpy.array(self.getDurationsForName(name))
tot = intervals.sum()
minVal = intervals.min()
maxVal = intervals.max()
pcnt25 = numpy.percentile(intervals, 25)
pcnt50 = numpy.percentile(intervals, 50)
pcnt75 = numpy.percentile(intervals, 75)
d[name] = {'tot': tot, 'min': minVal, 'max': maxVal,
'lower': pcnt25, 'median': pcnt50, 'upper': pcnt75,
'N': len(intervals)}
return d
[docs]class NetworkDataChannel:
"""
A network-visible channel to serve out all the required information to
a group of RIOS compute workers.
The channel has several major attributes.
workerInitData
a dictionary of objects which are used to initialize each
worker. This read-only, and cannot be modified by the workers.
inBlockBuffer
None, if compute workers are doing their own reading,
otherwise it is a BlockBuffer supplying input data to the
compute workers.
outBlockBuffer
a BlockBuffer where completed 'outputs' objects are
placed, ready for writing.
outqueue
a Queue. It is used for any non-pixel output data
coming from each compute worker, such as modified otherArgs
objects. Anything in this queue will be collected up by the
main thread after all compute workers have completed.
forceExit
An Event object. If set, this signals that workers should exit
as soon as possible
exceptionQue
A Queue. Any exceptions raised in the worker are caught and put
into this queue, to be dealt with in the main thread.
workerBarrier
A Barrier object. For the relevant compute worker kinds, all
workers will wait at this barrier, as will the main thread,
so that no processing starts until all compute workers are
ready to work.
If the constructor is given these major objects as arguments, then this
is the server of these objects, and they are served to the network on
a selected port number. The address of this server is available on the
instance as hostname, portnum and authkey attributes. The server will
create its own thread in which to run.
A client instance can be created by giving the constructor the hostname,
port number and authkey (obtained from the server object). This will then
connect to the server, and make available the data attributes as given.
The server must be shut down correctly, and so the shutdown() method
should always be called explicitly.
"""
def __init__(self, workerInitData=None, inBlockBuffer=None,
outBlockBuffer=None, forceExit=None, exceptionQue=None,
workerBarrier=None, hostname=None, portnum=None, authkey=None):
class DataChannelMgr(BaseManager):
pass
if cloudpickle is None:
msg = "Failed to import cloudpickle"
raise rioserrors.UnavailableError(msg)
if None not in (workerInitData, outBlockBuffer):
self.hostname = socket.gethostname()
# Authkey is a big long random bytes string. Making one which is
# also printable ascii.
self.authkey = secrets.token_hex()
self.workerInitData = cloudpickle.dumps(workerInitData)
self.inBlockBuffer = inBlockBuffer
self.outBlockBuffer = outBlockBuffer
self.outqueue = queue.Queue()
self.forceExit = forceExit
self.exceptionQue = exceptionQue
self.workerBarrier = workerBarrier
DataChannelMgr.register("get_workerdata",
callable=lambda: self.workerInitData)
DataChannelMgr.register("get_inblockbuffer",
callable=lambda: self.inBlockBuffer)
DataChannelMgr.register("get_outblockbuffer",
callable=lambda: self.outBlockBuffer)
DataChannelMgr.register("get_outqueue",
callable=lambda: self.outqueue)
DataChannelMgr.register("get_forceexit",
callable=lambda: self.forceExit)
DataChannelMgr.register("get_exceptionque",
callable=lambda: self.exceptionQue)
DataChannelMgr.register("get_workerbarrier",
callable=lambda: self.workerBarrier)
self.mgr = DataChannelMgr(address=(self.hostname, 0),
authkey=bytes(self.authkey, 'utf-8'))
self.server = self.mgr.get_server()
self.portnum = self.server.address[1]
self.threadPool = futures.ThreadPoolExecutor(max_workers=1)
self.serverThread = self.threadPool.submit(
self.server.serve_forever)
elif None not in (hostname, portnum, authkey):
DataChannelMgr.register("get_workerdata")
DataChannelMgr.register("get_outblockbuffer")
DataChannelMgr.register("get_inblockbuffer")
DataChannelMgr.register("get_outqueue")
DataChannelMgr.register("get_forceexit")
DataChannelMgr.register("get_exceptionque")
DataChannelMgr.register("get_workerbarrier")
self.mgr = DataChannelMgr(address=(hostname, portnum),
authkey=authkey)
self.hostname = hostname
self.portnum = portnum
self.authkey = authkey
self.mgr.connect()
# Get the proxy objects.
self.workerInitData = cloudpickle.loads(eval(str(
self.mgr.get_workerdata())))
self.inBlockBuffer = self.mgr.get_inblockbuffer()
self.outBlockBuffer = self.mgr.get_outblockbuffer()
self.outqueue = self.mgr.get_outqueue()
self.forceExit = self.mgr.get_forceexit()
self.exceptionQue = self.mgr.get_exceptionque()
self.workerBarrier = self.mgr.get_workerbarrier()
else:
msg = ("Must supply either (workerInitData, outBlockBuffer, etc.)" +
" or ALL of (hostname, portnum and authkey)")
raise ValueError(msg)
[docs] def shutdown(self):
"""
Shut down the NetworkDataChannel in the right order. This should always
be called explicitly by the creator, when it is no longer
needed. If left to the garbage collector and/or the interpreter
exit code, things are shut down in the wrong order, and the
interpreter hangs on exit.
I have tried __del__, also weakref.finalize and atexit.register,
and none of them avoid these problems. So, just make sure you
call shutdown explicitly, in the process which created the
NetworkDataChannel.
The client processes don't seem to care, presumably because they
are not running the server thread. Calling shutdown on the client
does nothing.
"""
if hasattr(self, 'server'):
self.server.stop_event.set()
if self.workerBarrier is not None:
self.workerBarrier.abort()
futures.wait([self.serverThread])
self.threadPool.shutdown()
[docs] def addressStr(self):
"""
Return a single string encoding the network address of this channel
"""
s = "{},{},{}".format(self.hostname, self.portnum, self.authkey)
return s
[docs]class RasterizationMgr:
"""
Manage rasterization of vector inputs, shared across multiple
read workers within a single process. Not intended to be shared
across compute workers on separate machines.
"""
def __init__(self):
self.lock = threading.Lock()
self.perFileLocks = {}
self.lookup = {}
[docs] def rasterize(self, vectorfile, rasterizeOptions, tmpfileMgr):
"""
Rasterize the given vector file, according to the given
rasterizeOptions.
Return the name of the temporary raster file.
This is thread-safe. Any other thread trying to rasterize the
same vector file will block until this has completed, and then
be given exactly the same temporary raster file.
"""
with self.lock:
if vectorfile not in self.perFileLocks:
self.perFileLocks[vectorfile] = threading.Lock()
with self.perFileLocks[vectorfile]:
if vectorfile not in self.lookup:
tmpraster = tmpfileMgr.mktempfile(prefix='rios_vecrast_',
suffix='.tif')
gdal.Rasterize(tmpraster, vectorfile, options=rasterizeOptions)
self.lookup[vectorfile] = tmpraster
return self.lookup[vectorfile]
[docs]class TempfileManager:
"""
A single object which can keep track of all the temporary files
created during a run. Shared between read worker threads, within
a single process, so must be thread-safe. Not shared across processes.
Includes methods to make and delete the temporary files.
Constructor takes a single string for tempdir. All subsequent
temp files will be created in a subdirectory underneath this.
"""
def __init__(self, tempdir):
self.tempsubdir = tempfile.mkdtemp(dir=tempdir, prefix='rios_')
self.tempfileList = []
self.lock = threading.Lock()
[docs] def mktempfile(self, prefix=None, suffix=None):
"""
Make a new tempfile, and return the full name
"""
with self.lock:
(fd, name) = tempfile.mkstemp(dir=self.tempsubdir,
prefix=prefix, suffix=suffix)
os.close(fd)
self.tempfileList.append(name)
return name
[docs] def cleanup(self):
"""
Remove all the temp files created here
"""
for filename in self.tempfileList:
try:
os.remove(filename)
except FileNotFoundError:
pass
# Now remove the temp subdir itself
try:
os.rmdir(self.tempsubdir)
except FileNotFoundError:
pass
def __del__(self):
self.cleanup()
[docs]class ApplierReturn:
"""
Hold all objects returned by the applier.apply() function
Fields
timings: an instance of :class:`rios.structures.Timers`
otherArgsList: list of :class:`rios.structures.OtherInputs`
By default, there is only one element, and it is the same as
the one passed in to apply(). However, these objects are not
thread-safe, so when using multiple compute workers, each worker
has its own copy of otherArgs, which it can modify independently.
These copies are then collected up again after all workers have
finished, and the list of these is made available on this return
object. The user is then free to merge these in whatever way is
suitable.
"""
def __init__(self):
self.timings = None
self.otherArgsList = None
self.workinggrid = None
self.singlePassMgr = None
[docs]class WorkerErrorRecord:
"""
Hold a record of an exception raised in a remote worker.
"""
def __init__(self, exc, workerType, workerID=None):
self.exc = exc
self.workerType = workerType
self.workerID = workerID
self.formattedTraceback = traceback.format_exception(exc)
def __str__(self):
headLine = "Error in {} worker".format(self.workerType)
if self.workerID is not None:
headLine += " {}".format(self.workerID)
lines = [headLine]
lines.extend([line.strip('\n') for line in self.formattedTraceback])
s = '\n'.join(lines) + '\n'
return s