computemanager

The ComputeWorkerManager abstract base class is for creating and managing a set of compute workers. Each different computeWorkerKind is implemented as a concrete subclass of this. All these classes are found in this module.

class rios.computemanager.AWSBatchComputeWorkerMgr[source]

Manage compute workers using AWS Batch.

Obsolete, and likely to be deprecated. See ECSComputeWorkerMgr instead.

getStackOutputs()[source]

Helper function to query the CloudFormation stack for outputs.

Uses the RIOS_AWSBATCH_STACK and RIOS_AWSBATCH_REGION env vars to determine which stack and region to query.

shutdown()[source]

Shut down the job pool

startWorkers(numWorkers=None, userFunction=None, infiles=None, outfiles=None, otherArgs=None, controls=None, blockList=None, inBlockBuffer=None, outBlockBuffer=None, workinggrid=None, allInfo=None, singleBlockComputeWorkers=False, tmpfileMgr=None, haveSharedTemp=True, exceptionQue=None)[source]

Start <numWorkers> AWS Batch jobs to process blocks of data

computeWorkerKind = 'CW_AWSBATCH'
computeWorkersRead_default = True
class rios.computemanager.ClassicBatchComputeWorkerMgr[source]

Manage compute workers using a classic batch queue, notably PBS or SLURM. Initially constructed with computeWorkerKind = None, one must then assign computeWorkerKind as either CW_PBS or CW_SLURM before use.

Will make use of the computeWorkerExtraParams argument to ConcurrencyStyle, if given, but this is optional. If given, it should be a dictionary, see Concurrency for details.

beginScript(logfile, workerID)[source]

Return list of initial script commands, depending on whether we are PBS or SLURM

checkBatchSystemAvailable()[source]

Check whether the selected batch queue system is available. If not, raise UnavailableError

findExtraErrors()[source]

Look for errors in the log files. These would be errors which were not reported via the data channel

static findLine(linelist, s)[source]

Find the first line which begins with the given string. Return the index of that line, or None if not found.

getJobId(stdout)[source]

Extract the jobId from the string returned when the job is submitted, depending on whether we are PBS or SLURM

getQlistHeaderCount()[source]

Number of lines to skip at the head of the qlist output

getQueueCmd()[source]

Return the command name for listing the current jobs in the batch queue, depending on whether we are PBS or SLURM. Return as a list of words, ready to give to Popen.

getSubmitCmd()[source]

Return the command name for submitting a job, depending on whether we are PBS or SLURM. Return as a list of words, ready to give to Popen.

shutdown()[source]

Shutdown the compute manager. Wait on batch jobs, then shut down the data channel

startWorkers(numWorkers=None, userFunction=None, infiles=None, outfiles=None, otherArgs=None, controls=None, blockList=None, inBlockBuffer=None, outBlockBuffer=None, workinggrid=None, allInfo=None, singleBlockComputeWorkers=False, tmpfileMgr=None, haveSharedTemp=True, exceptionQue=None)[source]

Start <numWorkers> PBS or SLURM jobs to process blocks of data

waitOnJobs()[source]

Wait for all batch jobs to complete

worker(workerID, tmpfileMgr)[source]

Assemble a worker job and submit it to the batch queue

computeWorkerKind = None
computeWorkersRead_default = True
class rios.computemanager.ComputeWorkerManager[source]

Abstract base class for all compute-worker manager subclasses

A subclass implements a particular way of managing RIOS compute-workers. It should over-ride all abstract methods given here.

getWorkerName(workerID)[source]

Return a string which uniquely identifies each work, including the jobName, if given.

makeOutObjList()[source]

Make a list of all the objects the workers put into outqueue on completion

setJobName(jobName)[source]

Sets the job name string, which is made available to worker processes. Defaults to None, and has only cosmetic effects.

setupNetworkCommunication(userFunction, infiles, outfiles, otherArgs, controls, workinggrid, allInfo, blockList, numWorkers, inBlockBuffer, outBlockBuffer, forceExit, exceptionQue, workerBarrier)[source]

Set up the standard methods of network communication between the workers and the main thread. This is expected to be the same for all workers running on separate machines from the main thread.

Creates the dataChan and outqueue attributes.

This routine is not needed for the Threads subclass, because it does not use the network versions of these communications.

abstract shutdown()[source]

Shutdown the computeWorkerManager

abstract startWorkers(numWorkers=None, userFunction=None, infiles=None, outfiles=None, otherArgs=None, controls=None, blockList=None, inBlockBuffer=None, outBlockBuffer=None, workinggrid=None, allInfo=None, singleBlockComputeWorkers=False, tmpfileMgr=None, haveSharedTemp=True, exceptionQue=None)[source]

Start the specified compute workers

computeWorkerKind = 'CW_NONE'
computeWorkersRead_default = None
jobName = None
outObjList = None
outqueue = None
class rios.computemanager.ECSComputeWorkerMgr[source]

Manage compute workers using Amazon AWS ECS

New in version 2.0.7

Requires some extra parameters in the ConcurrencyStyle constructor (computeWorkerExtraParams), in order to configure the AWS infrastructure. This class provides some helper functions for creating these for various use cases.

When creating a private cluster of EC2 instances, these are automatically tagged with some AWS tags. See Concurrency doc page for details.

checkTaskErrors()[source]

Check for errors in any of the worker tasks, and report to stderr.

createCluster()[source]

If requested to do so, create an ECS cluster to run on.

createTaskDef()[source]

If requested to do so, create a task definition for the worker tasks

getClusterInstanceCount(clusterName)[source]

Query the given cluster, and return the number of instances it has. If the cluster does not exist, return None.

getClusterTaskCount()[source]

Query the cluster, and return the number of tasks it has. This is the total of running and pending tasks. If the cluster does not exist, return None.

static makeExtraParams_Fargate(jobName=None, containerImage=None, taskRoleArn=None, executionRoleArn=None, subnets=None, subnet=None, securityGroups=None, cpu='0.5 vCPU', memory='1GB', cpuArchitecture=None, cloudwatchLogGroup=None, tags=None)[source]

Helper function to construct a minimal computeWorkerExtraParams dictionary suitable for using ECS with Fargate launchType, given just the bare essential information.

Returns a Python dictionary.

jobNamestr

Arbitrary string, optional. If given, this name will be incorporated into some AWS/ECS names for the compute workers, including the container name and the task family name.

containerImagestr

Required. URI of the container image to use for compute workers. This container must have RIOS installed. It can be the same container as used for the main script, as the entry point is over-written.

executionRoleArnstr

Required. ARN for an AWS role. This allows ECS to use AWS services on your behalf. A good start is a role including AmazonECSTaskExecutionRolePolicy, which allows access to ECR container registries and CloudWatch logs.

taskRoleArnstr

Required. ARN for an AWS role. This allows your code to use AWS services. This role should include policies such as AmazonS3FullAccess, covering any AWS services your compute workers will need.

subnetstr

Required. Subnet ID string associated with the VPC in which workers will run.

subnetslist of str

Deprecated. List of subnet ID strings associated with the VPC in which workers will run. This is an alternative to specifying a single subnet, but is deprecated, and should not be used. As far as we know, there is no good reason to spread workers across multiple subnets.

securityGroupslist of str

Required. List of security group IDs associated with the VPC.

cpustr

Number of CPU units requested for each compute worker, expressed in AWS’s own units. For example, ‘0.5 vCPU’, or ‘1024’ (which corresponds to the same thing). Both must be strings. This helps Fargate to select a suitable VM instance type (see below).

memorystr

Amount of memory requested for each compute worker, expressed in MiB, or with a units suffix. For example, ‘1024’ or its equivalent ‘1GB’. This helps Fargate to select a suitable VM instance type (see below).

cpuArchitecturestr

If given, selects the CPU architecture of the hosts to run worker on. Can be ‘ARM64’, defaults to ‘X86_64’.

cloudwatchLogGroupstr or None

Optional. Name of CloudWatch log group. If not None, each worker sends a log stream of its stdout & stderr to this log group. The group should already exist. If None, no CloudWatch logging is done. Intended for tracking obscure problems, rather than to use permanently.

tags: dict or None

Optional. If specified this needs to be a dictionary of key/value pairs which will be turned into AWS tags. These will be added to the ECS cluster, task definition and tasks. The keys and values must all be strings. Requires ecs:TagResource permission.

Only certain combinations of cpu and memory are allowed, as these are used by Fargate to select a suitable VM instance type. See ESC.Client.run_task() documentation for further details.

static makeExtraParams_PrivateCluster(jobName=None, numInstances=None, ami=None, instanceType=None, containerImage=None, taskRoleArn=None, executionRoleArn=None, subnet=None, securityGroups=None, instanceProfileArn=None, memoryReservation=1024, cloudwatchLogGroup=None, tags=None)[source]

Helper function to construct a basic computeWorkerExtraParams dictionary suitable for using ECS with a private per-job cluster, given just the bare essential information.

Returns a Python dictionary.

jobNamestr

Arbitrary string, optional. If given, this name will be incorporated into some AWS/ECS names for the compute workers, including the container name and the task family name.

numInstancesint

Number of VM instances which will comprise the private ECS cluster. The RIOS compute workers will be distributed across these, so it makes sense to have the same number of instances, i.e. one worker on each instance.

amistr

Amazon Machine Image ID string. This should be for an ECS-Optimized machine image, either as supplied by AWS, or custom-built, but it must have the ECS Agent installed. An example would be “ami-00065bb22bcbffde0”, which is an AWS-supplied ECS-Optimized image.

instanceTypestr

The string identifying the instance type for the VM instances which will make up the ECS cluster. An example would be “a1.medium”.

containerImagestr

Required. URI of the container image to use for compute workers. This container must have RIOS installed. It can be the same container as used for the main script, as the entry point is over-written.

executionRoleArnstr

Required. ARN for an AWS role. This allows ECS to use AWS services on your behalf. A good start is a role including AmazonECSTaskExecutionRolePolicy, which allows access to ECR container registries and CloudWatch logs.

taskRoleArnstr

Required. ARN for an AWS role. This allows your code to use AWS services. This role should include policies such as AmazonS3FullAccess, covering any AWS services your compute workers will need.

subnetstr

Required. A subnet ID string associated with the VPC in which workers will run.

securityGroupslist of str

Required. List of security group IDs associated with the VPC.

instanceProfileArnstr

The IamInstanceProfile ARN to use for the VM instances. This should include AmazonEC2ContainerServiceforEC2Role policy, which allows the instances to be part of an ECS cluster.

memoryReservationint

Optional. Memory (in MiB) reserved for the containers in each compute worker. This should be small enough to fit well inside the memory of the VM on which it is running. Often best to leave this as default until out-of-memory errors occur, then increase.

cloudwatchLogGroupstr or None

Optional. Name of CloudWatch log group. If not None, each worker sends a log stream of its stdout & stderr to this log group. The group should already exist. If None, no CloudWatch logging is done. Intended for tracking obscure problems, rather than to use permanently.

tags: dict or None

Optional. If specified this needs to be a dictionary of key/value pairs which will be turned into AWS tags. These will be added to the ECS cluster, task definition and tasks, and the EC2 instances. The keys and values must all be strings. Requires ecs:TagResource permission.

static makeJobIDstr(jobName)[source]

Make a job ID string to use in various generate names. It is unique to this run, and also includes any human-readable information available

runInstances(numWorkers)[source]

If requested to do so, run the instances required to populate the cluster

shutdown()[source]

Shut down the workers

shutdownCluster()[source]

Shut down the ECS cluster, if one has been created

startWorkers(numWorkers=None, userFunction=None, infiles=None, outfiles=None, otherArgs=None, controls=None, blockList=None, inBlockBuffer=None, outBlockBuffer=None, workinggrid=None, allInfo=None, singleBlockComputeWorkers=False, tmpfileMgr=None, haveSharedTemp=True, exceptionQue=None)[source]

Start <numWorkers> ECS tasks to process blocks of data

waitClusterInstanceCount(clusterName, endInstanceCount)[source]

Poll the given cluster until the instanceCount is equal to the given endInstanceCount

waitClusterTasksFinished()[source]

Poll the given cluster until the number of tasks reaches zero

computeWorkerKind = 'CW_ECS'
computeWorkersRead_default = True
defaultWaitClusterInstanceCountTimeout = 300
class rios.computemanager.SubprocComputeWorkerManager[source]

Purely for testing, not for normal use.

This class manages compute workers run through subprocess.Popen. This is not normally any improvement over using CW_THREADS, and should be avoided. I am using this purely as a test framework to emulate the batch queue types of compute worker, which are similarly disconnected from the main process, so I can work out the right mechanisms to use for exception handling and such like, and making sure the rios_computeworker command line works.

findExtraErrors()[source]

Check for errors in any worker stderr. These would be errors not reported via the data channel

shutdown()[source]

Shutdown the compute manager. Wait on batch jobs, then shut down the data channel

startWorkers(numWorkers=None, userFunction=None, infiles=None, outfiles=None, otherArgs=None, controls=None, blockList=None, inBlockBuffer=None, outBlockBuffer=None, workinggrid=None, allInfo=None, singleBlockComputeWorkers=False, tmpfileMgr=None, haveSharedTemp=True, exceptionQue=None)[source]

Start the specified compute workers

waitOnJobs()[source]

Wait for all worker subprocesses to complete

worker(workerID)[source]

Start one worker

computeWorkerKind = 'CW_SUBPROC'
computeWorkersRead_default = False
class rios.computemanager.ThreadsComputeWorkerMgr[source]

Manage compute workers using threads within the current process.

shutdown()[source]

Shut down the thread pool

startWorkers(numWorkers=None, userFunction=None, infiles=None, outfiles=None, otherArgs=None, controls=None, blockList=None, inBlockBuffer=None, outBlockBuffer=None, workinggrid=None, allInfo=None, singleBlockComputeWorkers=False, tmpfileMgr=None, haveSharedTemp=True, exceptionQue=None)[source]

Start <numWorkers> threads to process blocks of data

worker(userFunction, infiles, outfiles, otherArgs, controls, allInfo, workinggrid, blockList, inBlockBuffer, outBlockBuffer, outqueue, workerID, exceptionQue)[source]

This function is a worker for a single thread, with no reading or writing going on. All I/O is via the inBlockBuffer and outBlockBuffer objects.

computeWorkerKind = 'CW_THREADS'
computeWorkersRead_default = False
rios.computemanager.getComputeWorkerManager(cwKind)[source]

Returns a compute-worker manager object of the requested kind.