Note: Please go to docs.rightscale.com to access the current RightScale documentation set. Also, feel free to Chat with us!
Home > Guides > Cloud Workflow Developer Guide > Processes

Processes

 

 


Table of Contents Sections

Process Overview

Control over the timing of execution of activities is another area that differentiates workflows from traditional programs. A cloud workflow allows for describing explicitly how multiple activities are orchestrated: whether they run sequentially or concurrently and when to synchronize.

The building blocks for describing the sequencing are the sub and concurrent expressions. All sub-expressions of a sub expression execute sequentially while all sub-expressions of a concurrent expression execute concurrently. sub expressions can be nested inside concurrent expressions and vice versa providing the mean for describing what runs concurrently and when to synchronize.


concurrent do # the following two blocks execute concurrently
  sub do # Expressions below (until 'end') execute sequentially
    @servers = rs.servers.get(filter: ["name==my_server"])
    @instances = @servers.launch()
    sleep_until(all?(@instances.state, "operational"))
  end
  sub do
    @servers = rs.servers.get(filter: ["name==my_other_server"])
    @instances = @servers.launch()
    sleep_until(all?(@instances.state, "operational"))
  end
end # Marks the synchronization point for the concurrence
# At this point all servers are operational: both concurrent sequences are finished.

Process Tasks

All activities taking place in a process occur in tasks. Each sub-expression of a concurrent expression runs in its own task. Processes start with one task: the main task. The task_name attribute can be used to adorn sub-expressions of the concurrent expression to name tasks. This attribute can be specified on any expression (changing the name of the current task each time). However, the convention is to adorn the outer sub expression if there is one. That name can then be used to pause, resume, cancel, abort, or wait for that task:


concurrent do
  sub task_name:"maintain_application" do
    while true do
      @instances = rs.tags.by_tag(tags: ["app:state=active"], resource_type: "instances")
      call check_instances(@instances)
      sleep 15m # Check all instances of the app every 15 minutes
    end
  end
  sub do # Naming a task is optional
    while true do
      sub on_error: handle_update_error() do
        call update_application()
      end
​      sleep 1d
    end
  end
end

define handle_update_error() do
  pause_task maintain_application  # First pause maintenance task
  call repair_application()
  resume_task maintain_application # Resume maintenance
end

define update_application() do
  # ... Something that could fail
end

 

The snippet above shows two recurring tasks. The idea being that one of the tasks need to be controlled by the other. In this example, when an error occurs in the second task, the maintain_application task needs to be paused then later resumed. The controlling task accomplishes this by using the pause_task and resume_task keywords respectively.

Task Names

Tasks can be referred to using two different names: the local name (used in the example above) is the name used with the task_name attribute. This name can only be used to refer to a task that is a sibling, that is a task that was launched by the same task that also launched the task using the name. The other way to address a task is to use its global name: this name is defined using the parent tasks names recursively (excluding the main task) combined with the current task name using / as separator:


concurrent do
  sub task_name: grand_parent do
    concurrent do
      sub task_name: parent do
        concurrent do
          sub task_name: child do
            # do something
          end 
          concurrent do
            sub do
              wait_task grand_parent/parent/child # cannot use local name for 'child' task because
                                                  # not in a sibling task
              # do something
            end
            sub do
              # do something
            end
          end   
        end
      end
    end
  end
end

Tasks that are not explicitly named using the task_name attribute get assigned a unique name by the engine. The task_name() function (functions are covered in the next section) returns the global name of the current task.

Note how quotes around task names are optional, but strings and variables containing strings can be used wherever task names can:


concurrent do
  sub task_name: "foo" do
    # do something
  end
  sub do
    $task_name = "foo"
    wait_task $task_name 
    # do something
  end
end

Controlling Tasks

As mentioned earlier tasks can be paused, resumed, canceled, or aborted. The respective keywords are pause_taskresume_taskcancel_task and abort_task. Each of these are optionally followed by a task name. If no task name is given then the action applies to the current task.

Note that tasks can only be paused between expressions, that is if a request to pause a task is made while the task is running an expression that task will keep running until the expression is done and control returns to the engine proper. So if an expression takes a long time to execute, like running an action on a collection containing many resources, pausing the task won't suspend the execution of that expression. The action will keep running on all resources. Only when that expression is done will the task be paused.

A task that is paused prevents the process from completing. All tasks must finish for the whole process to complete successfully.

A process can also be paused, resumed, canceled, or aborted in its entirety using respectively the pauseresumecancel, or abort keyword. Executing any of these has the same effect as executing the corresponding task version in all currently running tasks. In particular, this means that pausing, canceling, or aborting a process will take effect once all tasks have finished running their current expression. The exact behavior of canceling and aborting are described below in the Ending Cloud Workflow processes section.

Note that (global) task names can be used in any expression; even expressions that are not sub-expressions of the concurrent expression that creates the task:


# If an error occurs during launch then cancel all launching tasks and terminate all servers.
define cancel_launch(@servers) do
 ​ cancel_task launch_app_server
  cancel_task launch_app_fe
  @servers.terminate()
end

...
@servers = rs.deployments.get(filter: ["name==default"]).servers() # Retrieve all servers of "default" deployment
concurrent do
  sub task_name: launch_app_server, on_error: cancel_launch(@servers) do
    @app_servers = select(@servers, { "name": "app_server" }) # Select servers named "app_server"
    @app_servers.launch()                                     # and launch them
    sleep_until(all?(@app_servers.state[], "operational"))    # Wait until they all are operational
  end
  sub task_name: launch_app_fe, on_error: cancel_launch(@servers) do
    @fe_servers = select(@servers, { "name": "front_end" })  # Select servers named "front_end"
    @fe_servers.launch()                                     # and launch them
  end
end

Canceling a task that has completed is not an error, it just has no effect. However if a task with the given name does not exist (i.e. there was never a task started with that name) then an error is raised. Similarly pausing or resuming a task that is already paused or running respectively has no effect as long as the task name is valid (otherwise an error is raised).

Control Priority

When multiple tasks attempt to control the same task concurrently the following priority order applies:

  • abort_task is always applied, meaning the target task is aborted as soon as at least one other task aborts it
  • cancel_task is applied if there is no abort_task call
  • pause_task is applied if there is no abort_task nor cancel_task calls
  • Finally resume_task takes effect only if it's the only control action being executed on the target task

Checking for Tasks

As covered earlier a task can be waited on using the wait_task keyword. The current task blocks until the given task finishes (i.e. completes, fails, is canceled, or is aborted). Like other task controlling actions (pause_taskresume_task, etc.) wait_task will have no effect if the task has already completed, but will raise an error if there is no task with the given name.

This means that there needs to be a way to wait for a task to start to guarantee that these actions do not result in errors. The expect_task keyword can be used for that purpose, here is an admittedly contrived example:


@servers = rs.deployments.get(filter: ["name==default"]).servers()
concurrent do
  sub task_name: launch_app_server do
    @servers = select(@servers, { name: "app_server" })
    @servers.launch()
    sleep_until(all?(@servers.state[], "operational"))
  end
  sub task_name: launch_app_fe do
    ​expect_task launch_app_server # We can be sure that the launch_app_server task has started so we can pause it 
    pause_task lanch_app_server
    @servers = select(@servers, { name: "front_end" })
    @servers.launch()
    resume_task launch_app_server
  end
end

expect_task blocks until a task with the corresponding name exists (and thus never raises an error).

Note: The state of the task does not matter: expect_task will not block if a task with the corresponding name has already finished.

wait_task can also be used with a number indicating the number of tasks that should be waited on. The task running the wait_task expression blocks until the given number of tasks complete. Note that this form is mostly useful when used as an attribute on a concurrent expression to indicate how many concurrent tasks should complete before the next expression runs.

Finally, wait_task also accepts an array of task names corresponding to the tasks that should complete prior to the execution resuming. This form can also be used as an attribute:


@servers = rs.deployments.get(filter: "name==default").servers()
concurrent wait_task: [launch_app_server, launch_app_fe] do 
  sub task_name: launch_app_server do
    @servers = select(@servers, { "name": "app_server" })
    @servers.launch()
    sleep_until(all?(@servers.state[], "operational"))
  end
  sub task_name: launch_app_fe do
    @servers = select(@servers, { "name": "front_end" })
    @servers.launch()
  end
  sub do
    @servers = select(@servers, { "name": "diagnostics_servers" })
    @servers.launch()
  end
end
# At this point the diagnostics servers may not have been launched yet (the last sub block may not have completed)

Synchronization Primitives

The most basic synchronization primitive is a bare concurrent expression. This expression will block until all sub-expressions have completed. Sometimes more control is needed. For example, it may suffice for one of the concurrent expressions to finish before proceeding. The concurrent expression wait_task attribute can be used in two different ways to provide the additional control:

  • When wait_task is followed by an integer, the concurrent expression will return after the corresponding number of tasks have completed.
  • When wait_task is followed by a list of task names, the concurrent expression will return after the corresponding tasks have completed.


In the following example:


concurrent wait_task: 1 do
  sub do
    @servers = rs.tags.by_tag(resource_type: "servers", tags: ["app:role=app_server_1"])
    @servers.launch()
    sleep_until(all?(@servers.state[], "operational"))
  end
  ​sub do
  ​  @servers = rs.tags.by_tag(resource_type: "servers", tags: ["app:role=app_server_2"])
    @servers.launch()
    sleep_until(all?(@servers.state[], "operational"))
  end
end
# At this point at least one of the sequences above has completed
@servers = rs.tags.by_tag(resource_type: "servers", tags: ["app:role=front_end"])
@servers.launch()

The front-ends will be launched as soon as either all servers tagged with app:role=app_server_1 or servers tagged with app:role=app_server_2 are operational. As stated above tasks can be waited on using their names:


concurrent wait_task: databases, app_servers do
  ​sub task_name: databases do
    @servers = rs.tags.by_tag(resource_type: "servers", tags: ["app:role=database"])
    @servers.launch()
    sleep_until(all?(@servers.state[], "operational"))
  end
  sub task_name: app_servers do
    @servers = rs.tags.by_tag(resource_type: "servers", tags: ["app:role=app_server"])
    @servers.launch()
    sleep_until(all?(@servers.state[], "operational"))
  end
  ​sub task_name: additional
    @servers = rs.tags.by_tag(resource_type: "servers", tags: ["app:role=additional"])
    @servers.launch()
  end
end
# At this point the databases and app_servers tasks have completed
@servers = rs.tags.by_tag(resource_type: "servers", tags: ["app:role=front_end"])
@servers.launch()

One interesting application of the wait_task attribute is when used in conjunction with the number 0 as follows:


concurrent wait_task: 0 do
  sub do
    @servers = rs.tags.by_tag(resource_type: "servers", tags: ["app:role=database"])
    @servers.launch()
  end
  sub do
   ​ @servers = rs.tags.by_tag(resource_type: "servers", tags: ["app:role=app_server"])
    @servers.launch()
  end
end # At this point tasks above have not completed
@servers = rs.tags.by_tag(resource_type: "servers", tags: ["app:role=diag_server"])
@servers.launch()

In this case, the process proceeds past the concurrent expression without waiting for any of the launched tasks. This is the same behavior as wrapping the whole definition extract above in an outer concurrent.

Tasks and State

Whenever a task is launched it gets its own copy of the parent task state. This includes all references and all variables currently defined in the parent task.


$n = 3
@servers = rs.deployments.get(filter: ["name==default"]).servers()
concurrent do
  sub do
  ​  # $n is equal to 3 and @servers contain all servers in the "default" deployment
    $n = 5
    @servers = rs.deployments.get(filter: ["name==other"]).servers()
    # $n is equal to 5 and @servers contain all servers in the "other" deployment
  end
  sub do
  ​  # $n is equal to 3 and @servers contain all servers in the "default" deployment
  end
end

Once a task finishes its state is discarded, however, it is sometimes necessary to retrieve state from a different task. RCL provides two mechanisms to share state across tasks:

  • The values for global references and variables are stored in the process, they can be written to and read from by any task.
  • A concurrent sub expression may optionally "return" local variables or references. Such values are merged back into the parent task. If multiple tasks in the concurrence return the same value then the behavior is undefined, in other words the code needs to use different names for values returned by different tasks.

Here is an example using the return keyword:

define main(@server1, @server2) return @server1, @server2 do
  concurrent return @server1, @server2 do
    sub do
      provision(@server1)
    end
    sub do
      provision(@server2)
    end
  end
  # @server1 and @server2 are now operational
end

concurrent foreach

Another way to create tasks in a process apart from concurrent is through the concurrent foreach expression. This expression runs all sub-expressions in sequence on all resources in the given resources collection concurrently. In other words a task is created for each resource in the collection:


@instances = rs.get(href: "/api/deployments/123").servers().current_instance()
concurrent foreach @instance in @instances do
  @instance.run_executable(recipe_name: "cassandra::default")
  @instance.run_executable(recipe_name: "cassandra::configure")
  @instance.run_executable(recipe_name: "cassandra::restart")
end

In the snippet above, the three RightScripts get run sequentially on all servers in the collection at once. If the @servers collection in the example above contained two resources the following would have the same effect:


@instances = rs.get(href: "/api/deployments/123").servers().current_instance()
concurrent do
  sub do
    @instance = @instances[0]
    @instance.run_executable(recipe_name: "cassandra::default")
    @instance.run_executable(recipe_name: "cassandra::configure")
    @instance.run_executable(recipe_name: "cassandra::restart")
  end
  ​sub do
  ​  @instance = @instances[1] 
    @instance.run_executable(recipe_name: "cassandra::default")
    @instance.run_executable(recipe_name: "cassandra::configure")
    @instance.run_executable(recipe_name: "cassandra::restart")
  end
end

Sometimes it is necessary to explicitly refer to one of the tasks that was spawned from the concurrent foreach execution. The task_prefix attribute is only valid for the concurrent foreach expression and allows defining a common prefix for all generated tasks. The task names are built from the prefix and the index of the resource in the collection:


concurrent do
  sub task_name: run_scripts do
    @servers = rs.get(href: "/api/deployments/123").servers()
    concurrent foreach @server in @servers task_prefix: cassandra_setup do
      @instance = @server.current_instance()
      @instance.run_executable(recipe_name: "cassandra::default")
      @instance.run_executable(recipe_name: "cassandra::configure")
      @instance.run_executable(recipe_name: "cassandra::restart")
    end
  end
  sub do
    expect_task run_scripts/cassandra_setup0
    cancel_task run_scripts/cassandra_setup0 # Cancel execution of scripts on first server in collection above
  end
end

In the example above, cassandra_setup0 refers to the task generated to run the concurrent foreach sub-expressions on the first resource in the @servers collection.

concurrent map

Apart from concurrent and concurrent foreachconcurrent map is the only other way to create tasks in a process. A concurrent map works as expected: each iteration runs concurrently and the resulting collections are built from the results of each iteration.

Note: Even though the resulting collections are built concurrently, concurrent map guarantees that the order of elements in the final collection(s) match the order of the collection being iterated on.

So for example:


@servers = rs.get(href: "/api/deployments/123").servers()
# Launch all servers concurrently and conditionally run a script on the resulting
# instance once it is operational.
@instances = concurrent map @server in @servers return @instance do
  @instance = @server.launch()
  sleep_until(@instance.state == "operational")
  if @instance.name =~ "/^app_/"@instance.run_executable(recipe_name: "app::connect")
  end
end

In the example above the instances in the @instances collection will be ordered identically to the servers in the @servers collection (that is, the instance at a given index in the @instances collection will correspond to the server at the same index in the @servers collection).

Ending processes

A process ends once all its tasks end. There are four conditions that will cause the execution of a task to end:

  1. Completing the task: the task has no more expressions to run.
  2. Failing the task: an expression raised an error that was not handled
  3. Canceling the task: this can be done through the cancel and cancel_task keywords.
  4. Aborting the task: this can be done through the abort and abort_task keywords.

Canceling a task or a process

Canceling a task can be done at any time in any task using the cancel_task keyword. This provides a way to finish "cleanly" a task that still has expressions to run. The cloud workflow can define rollback handlers that get triggered when the task cancels. These handlers behave much like timeout or error handlers: they may take arbitrary arguments and inherit the local variables and references of the caller. Nested rollback handlers are executed in reverse order as shown in this example:


define delete_deployment($deployment_name) do@deployment = rs.deployments.get(filter: ["name==" + $deployment_name])
  @deployment.destroy()
end

define delete_servers($server_names) doforeach $name in $server_names do
    @server = rs.servers.get(filter: ["name==" + $name])
    @sever.destroy()
  end
end
sub on_rollback: delete_deployment($deployment_name) do  # Assumes $deployment_name exists
  rs.deployments.create(deployment: { "name": $deployment_name })
  # ... do more stuff
  sub on_rollback: delete_servers($server_names) do    # Assumes $server_names exists
    foreach $name in $server_names do
      # Assumes $server_params exists and is a hash of all required params to create a server
      rs.servers.create(server: { "name": $name } + $server_params)
      # ... do more stuff
    end
    # ... do more stuff, initialize $has_errors
    if $has_errors
      cancel_task # Will call both delete_servers and delete_deployment in this order
    end
  end
end

In this snippet, if $has_errors gets initialized then the process is canceled and both the delete_servers and the delete_deployment get run in that order.

Note: the cancel_task keyword can also be used as an attribute value for the on_timeout and on_error attributes meaning that if a timeout or an error occur respectively then the task is canceled.

Canceling a process is done using the cancel keyword. This causes all the running tasks to be canceled and follow the same logic as above, potentially executing multiple rollback handlers concurrently. Once all rollback handlers finish then the process ends and the status of all its tasks is set to canceled.

Aborting a task or a process

Tasks can also be terminated through the abort_task keyword. This causes the task to finish immediately bypassing all rollback handlers. The abort keyword causes all the tasks in the current process to be aborted. The process thus finishes immediately and the status of all its tasks is set to aborted.

Concurrency and Expressions

As described in Cloud Workflow and Definitions a definition consists of a sequence statement. Each statement is in turn made of expressions. The engine makes the following guarantee:

Expressions always run atomatically

In particular, if we consider any expression running in a concurrence (inside a concurrent, concurrent foreach, or concurrent map), then the rule above dictates that each concurrent expression runs atomatically. So if we consider:


concurrent do
  sub do
  ​  @servers = rs.servers.get(filter: "name==app_server")
    @@instances << @servers.launch() # (1)
  end
  sub do
  ​  @@instances << rs.get(href: "/api/servers/123").launch() # (2) 
  end
end

In the definition above, statement (1) is composed of two expressions: the call to the launch() action followed by the assignment to @@instances. Statement (2) is composed of 3 expressions: the call to get() followed by the call to launch() and finally the append to the @@instances collection. Since expressions run atomatically the definition above guarantees that the @@instances collection will end-up with all instances, there is no need to explicitly synchronize the appends to @@instances. There is no guarantee about ordering though so it could be that the single instance retrieved in statement 2 is first in the collection.

Note that the following could generate inconsistent results:


# DO NOT DO THIS
@instances = rs.get(href: "/api/deployments/123").servers(filter: ["state==operational"]).current_instance()
$$failed = 0
concurrent foreach @instance in @instances do
  @task = @instance.run_executable(recipe_name: "sys:do_reconverge_list_disable")
  sleep_until(@task.summary =~ "/^completed|^failed/")
  if @task.summary =~ "/^failed"
    $$failed = $$failed + 1 # (1) Oops, $$failed can be overridden concurrently
  end
end

In the example above, statement (1) is comprised of two expressions: the increment and the assignment. If two tasks were to increment concurrently after reading the same value then one of the increments would get lost (both tasks would write back the same value to $$failed). The concurrent map expression should be used instead to build results concurrently:


# DO THIS INSTEAD
@instances = rs.get(href: "/api/deployments/123").servers(filter: ["state==operational"]).current_instance()
$failed_ones = concurrent map @instance in @instances return $failed_one do
  @task = @instance.run_executable(recipe_name: "sys:do_reconverge_list_disable")
  sleep_until(@task.summary =~ "/^completed|^failed/")
  if @task.summary =~ "/^failed"
    $failed_one = 1  
  end # Do not return anything on success
end
$failed = size($failed_ones) 

The concurrent map expression takes care of building the resulting array from the results returned by each concurrent execution. There is no problem of the task overriding values concurrently in this case.

Summary

A process may run one or more tasks concurrently at any time. RCL allows for describing how these tasks should be synchronized by providing both synchronization primitives and keywords for controlling tasks individually. A process ends once all its tasks end. A task ends when it completes (no more expression to execute), fails (an expression raises an error that is not handled), is canceled or is aborted. Canceling a task will cause any rollback handler to trigger and do additional processing before the task ends.

Note: the concept of tasks and definitions are completely disjoint and should not be confused: a definition always runs in the task that ran the call expression. In other words, simply using call does not create a new task.

 

RCL Resources Cloud Workflows & Definitions Variables Attributes & Error Handling Branching & Looping ► Processes Functions Operators Mapping
You must to post a comment.
Last modified
15:37, 19 Mar 2015

Tags

Classifications

This page has no classifications.

Announcements

None


© 2006-2014 RightScale, Inc. All rights reserved.
RightScale is a registered trademark of RightScale, Inc. All other products and services may be trademarks or servicemarks of their respective owners.