computemanager

class rios.computemanager.AWSBatchComputeWorkerMgr[source]

Manage compute workers using AWS Batch.

getStackOutputs()[source]

Helper function to query the CloudFormation stack for outputs.

Uses the RIOS_AWSBATCH_STACK and RIOS_AWSBATCH_REGION env vars to determine which stack and region to query.

shutdown()[source]

Shut down the job pool

startWorkers(numWorkers=None, userFunction=None, infiles=None, outfiles=None, otherArgs=None, controls=None, blockList=None, inBlockBuffer=None, outBlockBuffer=None, workinggrid=None, allInfo=None, computeWorkersRead=False, singleBlockComputeWorkers=False, tmpfileMgr=None, haveSharedTemp=True, exceptionQue=None)[source]

Start <numWorkers> AWS Batch jobs to process blocks of data

computeWorkerKind = 'CW_AWSBATCH'
class rios.computemanager.ClassicBatchComputeWorkerMgr[source]

Manage compute workers using a classic batch queue, notably PBS or SLURM. Initially constructed with computeWorkerKind = None, one must then assign computeWorkerKind as either CW_PBS or CW_SLURM before use.

beginScript(logfile)[source]

Return list of initial script commands, depending on whether we are PBS or SLURM

checkBatchSystemAvailable()[source]

Check whether the selected batch queue system is available. If not, raise UnavailableError

findExtraErrors()[source]

Look for errors in the log files. These would be errors which were not reported via the data channel

static findLine(linelist, s)[source]

Find the first line which begins with the given string. Return the index of that line, or None if not found.

getJobId(stdout)[source]

Extract the jobId from the string returned when the job is submitted, depending on whether we are PBS or SLURM

getQlistHeaderCount()[source]

Number of lines to skip at the head of the qlist output

getQueueCmd()[source]

Return the command name for listing the current jobs in the batch queue, depending on whether we are PBS or SLURM. Return as a list of words, ready to give to Popen.

getSubmitCmd()[source]

Return the command name for submitting a job, depending on whether we are PBS or SLURM. Return as a list of words, ready to give to Popen.

shutdown()[source]

Shutdown the compute manager. Wait on batch jobs, then shut down the data channel

startWorkers(numWorkers=None, userFunction=None, infiles=None, outfiles=None, otherArgs=None, controls=None, blockList=None, inBlockBuffer=None, outBlockBuffer=None, workinggrid=None, allInfo=None, computeWorkersRead=False, singleBlockComputeWorkers=False, tmpfileMgr=None, haveSharedTemp=True, exceptionQue=None)[source]

Start <numWorkers> PBS or SLURM jobs to process blocks of data

waitOnJobs()[source]

Wait for all batch jobs to complete

worker(workerID, tmpfileMgr)[source]

Assemble a worker job and submit it to the batch queue

computeWorkerKind = None
class rios.computemanager.ComputeWorkerManager[source]

Abstract base class for all compute-worker manager subclasses

A subclass implements a particular way of managing RIOS compute-workers. It should over-ride all abstract methods given here.

makeOutObjList()[source]

Make a list of all the objects the workers put into outqueue on completion

setupNetworkCommunication(userFunction, infiles, outfiles, otherArgs, controls, workinggrid, allInfo, blockList, numWorkers, inBlockBuffer, outBlockBuffer, forceExit, exceptionQue, workerBarrier)[source]

Set up the standard methods of network communication between the workers and the main thread. This is expected to be the same for all workers running on separate machines from the main thread.

Creates the dataChan and outqueue attributes.

This routine is not needed for the Threads subclass, because it does not use the network versions of these communications.

abstract shutdown()[source]

Shutdown the computeWorkerManager

abstract startWorkers(numWorkers=None, userFunction=None, infiles=None, outfiles=None, otherArgs=None, controls=None, blockList=None, inBlockBuffer=None, outBlockBuffer=None, workinggrid=None, allInfo=None, computeWorkersRead=False, singleBlockComputeWorkers=False, tmpfileMgr=None, haveSharedTemp=True, exceptionQue=None)[source]

Start the specified compute workers

computeWorkerKind = 'CW_NONE'
outObjList = None
outqueue = None
class rios.computemanager.SubprocComputeWorkerManager[source]

Purely for testing, not for normal use.

This class manages compute workers run through subprocess.Popen. This is not normally any improvement over using CW_THREADS, and should be avoided. I am using this purely as a test framework to emulate the batch queue types of compute worker, which are similarly disconnected from the main process, so I can work out the right mechanisms to use for exception handling and such like, and making sure the rios_computeworker command line works.

findExtraErrors()[source]

Check for errors in any worker stderr. These would be errors not reported via the data channel

shutdown()[source]

Shutdown the compute manager. Wait on batch jobs, then shut down the data channel

startWorkers(numWorkers=None, userFunction=None, infiles=None, outfiles=None, otherArgs=None, controls=None, blockList=None, inBlockBuffer=None, outBlockBuffer=None, workinggrid=None, allInfo=None, computeWorkersRead=False, singleBlockComputeWorkers=False, tmpfileMgr=None, haveSharedTemp=True, exceptionQue=None)[source]

Start the specified compute workers

waitOnJobs()[source]

Wait for all worker subprocesses to complete

worker(workerID)[source]

Start one worker

computeWorkerKind = 'CW_SUBPROC'
class rios.computemanager.ThreadsComputeWorkerMgr[source]

Manage compute workers using the threads within the current process.

shutdown()[source]

Shut down the thread pool

startWorkers(numWorkers=None, userFunction=None, infiles=None, outfiles=None, otherArgs=None, controls=None, blockList=None, inBlockBuffer=None, outBlockBuffer=None, workinggrid=None, allInfo=None, computeWorkersRead=False, singleBlockComputeWorkers=False, tmpfileMgr=None, haveSharedTemp=True, exceptionQue=None)[source]

Start <numWorkers> threads to process blocks of data

worker(userFunction, infiles, outfiles, otherArgs, controls, allInfo, workinggrid, blockList, inBlockBuffer, outBlockBuffer, outqueue, workerID, exceptionQue)[source]

This function is a worker for a single thread, with no reading or writing going on. All I/O is via the inBlockBuffer and outBlockBuffer objects.

computeWorkerKind = 'CW_THREADS'
rios.computemanager.getComputeWorkerManager(cwKind)[source]

Returns a compute-worker manager object of the requested kind.