Table of Contents | Sections |
The building blocks for describing the sequencing are the sub and concurrent expressions. All sub-expressions of a sub expression execute sequentially while all sub-expressions of a concurrent expression execute concurrently. sub expressions can be nested inside concurrent expressions and vice versa providing the mean for describing what runs concurrently and when to synchronize.
concurrent do # the following two blocks execute concurrently sub do # Expressions below (until 'end') execute sequentially @servers = rs.servers.get(filter: ["name==my_server"]) @instances = @servers.launch() sleep_until(all?(@instances.state, "operational")) end sub do @servers = rs.servers.get(filter: ["name==my_other_server"]) @instances = @servers.launch() sleep_until(all?(@instances.state, "operational")) end end # Marks the synchronization point for the concurrence # At this point all servers are operational: both concurrent sequences are finished.
All activities taking place in a process occur in tasks. Each sub-expression of a concurrent expression runs in its own task. Processes start with one task: the main task. The task_name attribute can be used to adorn sub-expressions of the concurrent expression to name tasks. This attribute can be specified on any expression (changing the name of the current task each time). However, the convention is to adorn the outer sub expression if there is one. That name can then be used to pause, resume, cancel, abort, or wait for that task:
concurrent do sub task_name:"maintain_application" do while true do @instances = rs.tags.by_tag(tags: ["app:state=active"], resource_type: "instances") call check_instances(@instances) sleep 15m # Check all instances of the app every 15 minutes end end sub do # Naming a task is optional while true do sub on_error: handle_update_error() do call update_application() end sleep 1d end end end define handle_update_error() do pause_task maintain_application # First pause maintenance task call repair_application() resume_task maintain_application # Resume maintenance end define update_application() do # ... Something that could fail end
The snippet above shows two recurring tasks. The idea being that one of the tasks need to be controlled by the other. In this example, when an error occurs in the second task, the maintain_application task needs to be paused then later resumed. The controlling task accomplishes this by using the pause_task and resume_task keywords respectively.
Tasks can be referred to using two different names: the local name (used in the example above) is the name used with the task_name attribute. This name can only be used to refer to a task that is a sibling, that is a task that was launched by the same task that also launched the task using the name. The other way to address a task is to use its global name: this name is defined using the parent tasks names recursively (excluding the main task) combined with the current task name using / as separator:
concurrent do sub task_name: grand_parent do concurrent do sub task_name: parent do concurrent do sub task_name: child do # do something end concurrent do sub do wait_task grand_parent/parent/child # cannot use local name for 'child' task because # not in a sibling task # do something end sub do # do something end end end end end end end
Tasks that are not explicitly named using the task_name attribute get assigned a unique name by the engine. The task_name() function (functions are covered in the next section) returns the global name of the current task.
Note how quotes around task names are optional, but strings and variables containing strings can be used wherever task names can:
concurrent do sub task_name: "foo" do # do something end sub do $task_name = "foo" wait_task $task_name # do something end end
As mentioned earlier tasks can be paused, resumed, canceled, or aborted. The respective keywords are pause_task, resume_task, cancel_task and abort_task. Each of these are optionally followed by a task name. If no task name is given then the action applies to the current task.
Note that tasks can only be paused between expressions, that is if a request to pause a task is made while the task is running an expression that task will keep running until the expression is done and control returns to the engine proper. So if an expression takes a long time to execute, like running an action on a collection containing many resources, pausing the task won't suspend the execution of that expression. The action will keep running on all resources. Only when that expression is done will the task be paused.
A task that is paused prevents the process from completing. All tasks must finish for the whole process to complete successfully.
A process can also be paused, resumed, canceled, or aborted in its entirety using respectively the pause, resume, cancel, or abort keyword. Executing any of these has the same effect as executing the corresponding task version in all currently running tasks. In particular, this means that pausing, canceling, or aborting a process will take effect once all tasks have finished running their current expression. The exact behavior of canceling and aborting are described below in the Ending Cloud Workflow processes section.
Note that (global) task names can be used in any expression; even expressions that are not sub-expressions of the concurrent expression that creates the task:
# If an error occurs during launch then cancel all launching tasks and terminate all servers. define cancel_launch(@servers) do cancel_task launch_app_server cancel_task launch_app_fe @servers.terminate() end ... @servers = rs.deployments.get(filter: ["name==default"]).servers() # Retrieve all servers of "default" deployment concurrent do sub task_name: launch_app_server, on_error: cancel_launch(@servers) do @app_servers = select(@servers, { "name": "app_server" }) # Select servers named "app_server" @app_servers.launch() # and launch them sleep_until(all?(@app_servers.state[], "operational")) # Wait until they all are operational end sub task_name: launch_app_fe, on_error: cancel_launch(@servers) do @fe_servers = select(@servers, { "name": "front_end" }) # Select servers named "front_end" @fe_servers.launch() # and launch them end end
Canceling a task that has completed is not an error, it just has no effect. However if a task with the given name does not exist (i.e. there was never a task started with that name) then an error is raised. Similarly pausing or resuming a task that is already paused or running respectively has no effect as long as the task name is valid (otherwise an error is raised).
When multiple tasks attempt to control the same task concurrently the following priority order applies:
As covered earlier a task can be waited on using the wait_task keyword. The current task blocks until the given task finishes (i.e. completes, fails, is canceled, or is aborted). Like other task controlling actions (pause_task, resume_task, etc.) wait_task will have no effect if the task has already completed, but will raise an error if there is no task with the given name.
This means that there needs to be a way to wait for a task to start to guarantee that these actions do not result in errors. The expect_task keyword can be used for that purpose, here is an admittedly contrived example:
@servers = rs.deployments.get(filter: ["name==default"]).servers() concurrent do sub task_name: launch_app_server do @servers = select(@servers, { name: "app_server" }) @servers.launch() sleep_until(all?(@servers.state[], "operational")) end sub task_name: launch_app_fe do expect_task launch_app_server # We can be sure that the launch_app_server task has started so we can pause it pause_task lanch_app_server @servers = select(@servers, { name: "front_end" }) @servers.launch() resume_task launch_app_server end end
expect_task blocks until a task with the corresponding name exists (and thus never raises an error).
Note: The state of the task does not matter: expect_task will not block if a task with the corresponding name has already finished.
wait_task can also be used with a number indicating the number of tasks that should be waited on. The task running the wait_task expression blocks until the given number of tasks complete. Note that this form is mostly useful when used as an attribute on a concurrent expression to indicate how many concurrent tasks should complete before the next expression runs.
Finally, wait_task also accepts an array of task names corresponding to the tasks that should complete prior to the execution resuming. This form can also be used as an attribute:
@servers = rs.deployments.get(filter: "name==default").servers() concurrent wait_task: [launch_app_server, launch_app_fe] do sub task_name: launch_app_server do @servers = select(@servers, { "name": "app_server" }) @servers.launch() sleep_until(all?(@servers.state[], "operational")) end sub task_name: launch_app_fe do @servers = select(@servers, { "name": "front_end" }) @servers.launch() end sub do @servers = select(@servers, { "name": "diagnostics_servers" }) @servers.launch() end end # At this point the diagnostics servers may not have been launched yet (the last sub block may not have completed)
The most basic synchronization primitive is a bare concurrent expression. This expression will block until all sub-expressions have completed. Sometimes more control is needed. For example, it may suffice for one of the concurrent expressions to finish before proceeding. The concurrent expression wait_task attribute can be used in two different ways to provide the additional control:
In the following example:
concurrent wait_task: 1 do sub do @servers = rs.tags.by_tag(resource_type: "servers", tags: ["app:role=app_server_1"]) @servers.launch() sleep_until(all?(@servers.state[], "operational")) end sub do @servers = rs.tags.by_tag(resource_type: "servers", tags: ["app:role=app_server_2"]) @servers.launch() sleep_until(all?(@servers.state[], "operational")) end end # At this point at least one of the sequences above has completed @servers = rs.tags.by_tag(resource_type: "servers", tags: ["app:role=front_end"]) @servers.launch()
The front-ends will be launched as soon as either all servers tagged with app:role=app_server_1 or servers tagged with app:role=app_server_2 are operational. As stated above tasks can be waited on using their names:
concurrent wait_task: databases, app_servers do sub task_name: databases do @servers = rs.tags.by_tag(resource_type: "servers", tags: ["app:role=database"]) @servers.launch() sleep_until(all?(@servers.state[], "operational")) end sub task_name: app_servers do @servers = rs.tags.by_tag(resource_type: "servers", tags: ["app:role=app_server"]) @servers.launch() sleep_until(all?(@servers.state[], "operational")) end sub task_name: additional @servers = rs.tags.by_tag(resource_type: "servers", tags: ["app:role=additional"]) @servers.launch() end end # At this point the databases and app_servers tasks have completed @servers = rs.tags.by_tag(resource_type: "servers", tags: ["app:role=front_end"]) @servers.launch()
One interesting application of the wait_task attribute is when used in conjunction with the number 0 as follows:
concurrent wait_task: 0 do sub do @servers = rs.tags.by_tag(resource_type: "servers", tags: ["app:role=database"]) @servers.launch() end sub do @servers = rs.tags.by_tag(resource_type: "servers", tags: ["app:role=app_server"]) @servers.launch() end end # At this point tasks above have not completed @servers = rs.tags.by_tag(resource_type: "servers", tags: ["app:role=diag_server"]) @servers.launch()
In this case, the process proceeds past the concurrent expression without waiting for any of the launched tasks. This is the same behavior as wrapping the whole definition extract above in an outer concurrent.
Whenever a task is launched it gets its own copy of the parent task state. This includes all references and all variables currently defined in the parent task.
$n = 3 @servers = rs.deployments.get(filter: ["name==default"]).servers() concurrent do sub do # $n is equal to 3 and @servers contain all servers in the "default" deployment $n = 5 @servers = rs.deployments.get(filter: ["name==other"]).servers() # $n is equal to 5 and @servers contain all servers in the "other" deployment end sub do # $n is equal to 3 and @servers contain all servers in the "default" deployment end end
Once a task finishes its state is discarded, however, it is sometimes necessary to retrieve state from a different task. RCL provides two mechanisms to share state across tasks:
Here is an example using the return keyword:
define main(@server1, @server2) return @server1, @server2 do concurrent return @server1, @server2 do sub do provision(@server1) end sub do provision(@server2) end end # @server1 and @server2 are now operational end
Another way to create tasks in a process apart from concurrent is through the concurrent foreach expression. This expression runs all sub-expressions in sequence on all resources in the given resources collection concurrently. In other words a task is created for each resource in the collection:
@instances = rs.get(href: "/api/deployments/123").servers().current_instance() concurrent foreach @instance in @instances do @instance.run_executable(recipe_name: "cassandra::default") @instance.run_executable(recipe_name: "cassandra::configure") @instance.run_executable(recipe_name: "cassandra::restart") end
In the snippet above, the three RightScripts get run sequentially on all servers in the collection at once. If the @servers collection in the example above contained two resources the following would have the same effect:
@instances = rs.get(href: "/api/deployments/123").servers().current_instance() concurrent do sub do @instance = @instances[0] @instance.run_executable(recipe_name: "cassandra::default") @instance.run_executable(recipe_name: "cassandra::configure") @instance.run_executable(recipe_name: "cassandra::restart") end sub do @instance = @instances[1] @instance.run_executable(recipe_name: "cassandra::default") @instance.run_executable(recipe_name: "cassandra::configure") @instance.run_executable(recipe_name: "cassandra::restart") end end
Sometimes it is necessary to explicitly refer to one of the tasks that was spawned from the concurrent foreach execution. The task_prefix attribute is only valid for the concurrent foreach expression and allows defining a common prefix for all generated tasks. The task names are built from the prefix and the index of the resource in the collection:
concurrent do sub task_name: run_scripts do @servers = rs.get(href: "/api/deployments/123").servers() concurrent foreach @server in @servers task_prefix: cassandra_setup do @instance = @server.current_instance() @instance.run_executable(recipe_name: "cassandra::default") @instance.run_executable(recipe_name: "cassandra::configure") @instance.run_executable(recipe_name: "cassandra::restart") end end sub do expect_task run_scripts/cassandra_setup0 cancel_task run_scripts/cassandra_setup0 # Cancel execution of scripts on first server in collection above end end
In the example above, cassandra_setup0 refers to the task generated to run the concurrent foreach sub-expressions on the first resource in the @servers collection.
Apart from concurrent and concurrent foreach, concurrent map is the only other way to create tasks in a process. A concurrent map works as expected: each iteration runs concurrently and the resulting collections are built from the results of each iteration.
Note: Even though the resulting collections are built concurrently, concurrent map guarantees that the order of elements in the final collection(s) match the order of the collection being iterated on.
So for example:
@servers = rs.get(href: "/api/deployments/123").servers() # Launch all servers concurrently and conditionally run a script on the resulting # instance once it is operational. @instances = concurrent map @server in @servers return @instance do @instance = @server.launch() sleep_until(@instance.state == "operational") if @instance.name =~ "/^app_/" @instance.run_executable(recipe_name: "app::connect") end end
In the example above the instances in the @instances collection will be ordered identically to the servers in the @servers collection (that is, the instance at a given index in the @instances collection will correspond to the server at the same index in the @servers collection).
A process ends once all its tasks end. There are four conditions that will cause the execution of a task to end:
Canceling a task can be done at any time in any task using the cancel_task keyword. This provides a way to finish "cleanly" a task that still has expressions to run. The cloud workflow can define rollback handlers that get triggered when the task cancels. These handlers behave much like timeout or error handlers: they may take arbitrary arguments and inherit the local variables and references of the caller. Nested rollback handlers are executed in reverse order as shown in this example:
define delete_deployment($deployment_name) do @deployment = rs.deployments.get(filter: ["name==" + $deployment_name]) @deployment.destroy() end define delete_servers($server_names) do foreach $name in $server_names do @server = rs.servers.get(filter: ["name==" + $name]) @sever.destroy() end end sub on_rollback: delete_deployment($deployment_name) do # Assumes $deployment_name exists rs.deployments.create(deployment: { "name": $deployment_name }) # ... do more stuff sub on_rollback: delete_servers($server_names) do # Assumes $server_names exists foreach $name in $server_names do # Assumes $server_params exists and is a hash of all required params to create a server rs.servers.create(server: { "name": $name } + $server_params) # ... do more stuff end # ... do more stuff, initialize $has_errors if $has_errors cancel_task # Will call both delete_servers and delete_deployment in this order end end end
In this snippet, if $has_errors gets initialized then the process is canceled and both the delete_servers and the delete_deployment get run in that order.
Note: the cancel_task keyword can also be used as an attribute value for the on_timeout and on_error attributes meaning that if a timeout or an error occur respectively then the task is canceled.
Canceling a process is done using the cancel keyword. This causes all the running tasks to be canceled and follow the same logic as above, potentially executing multiple rollback handlers concurrently. Once all rollback handlers finish then the process ends and the status of all its tasks is set to canceled.
Tasks can also be terminated through the abort_task keyword. This causes the task to finish immediately bypassing all rollback handlers. The abort keyword causes all the tasks in the current process to be aborted. The process thus finishes immediately and the status of all its tasks is set to aborted.
As described in Cloud Workflow and Definitions a definition consists of a sequence statement. Each statement is in turn made of expressions. The engine makes the following guarantee:
Expressions always run atomatically
In particular, if we consider any expression running in a concurrence (inside a concurrent, concurrent foreach, or concurrent map), then the rule above dictates that each concurrent expression runs atomatically. So if we consider:
concurrent do sub do @servers = rs.servers.get(filter: "name==app_server") @@instances << @servers.launch() # (1) end sub do @@instances << rs.get(href: "/api/servers/123").launch() # (2) end end
In the definition above, statement (1) is composed of two expressions: the call to the launch() action followed by the assignment to @@instances. Statement (2) is composed of 3 expressions: the call to get() followed by the call to launch() and finally the append to the @@instances collection. Since expressions run atomatically the definition above guarantees that the @@instances collection will end-up with all instances, there is no need to explicitly synchronize the appends to @@instances. There is no guarantee about ordering though so it could be that the single instance retrieved in statement 2 is first in the collection.
Note that the following could generate inconsistent results:
# DO NOT DO THIS @instances = rs.get(href: "/api/deployments/123").servers(filter: ["state==operational"]).current_instance() $$failed = 0 concurrent foreach @instance in @instances do @task = @instance.run_executable(recipe_name: "sys:do_reconverge_list_disable") sleep_until(@task.summary =~ "/^completed|^failed/") if @task.summary =~ "/^failed" $$failed = $$failed + 1 # (1) Oops, $$failed can be overridden concurrently end end
In the example above, statement (1) is comprised of two expressions: the increment and the assignment. If two tasks were to increment concurrently after reading the same value then one of the increments would get lost (both tasks would write back the same value to $$failed). The concurrent map expression should be used instead to build results concurrently:
# DO THIS INSTEAD @instances = rs.get(href: "/api/deployments/123").servers(filter: ["state==operational"]).current_instance() $failed_ones = concurrent map @instance in @instances return $failed_one do @task = @instance.run_executable(recipe_name: "sys:do_reconverge_list_disable") sleep_until(@task.summary =~ "/^completed|^failed/") if @task.summary =~ "/^failed" $failed_one = 1 end # Do not return anything on success end $failed = size($failed_ones)
The concurrent map expression takes care of building the resulting array from the results returned by each concurrent execution. There is no problem of the task overriding values concurrently in this case.
A process may run one or more tasks concurrently at any time. RCL allows for describing how these tasks should be synchronized by providing both synchronization primitives and keywords for controlling tasks individually. A process ends once all its tasks end. A task ends when it completes (no more expression to execute), fails (an expression raises an error that is not handled), is canceled or is aborted. Canceling a task will cause any rollback handler to trigger and do additional processing before the task ends.
Note: the concept of tasks and definitions are completely disjoint and should not be confused: a definition always runs in the task that ran the call expression. In other words, simply using call does not create a new task.
RCL | Resources | Cloud Workflows & Definitions | Variables | Attributes & Error Handling | Branching & Looping | ► Processes | Functions | Operators | Mapping |
© 2006-2014 RightScale, Inc. All rights reserved.
RightScale is a registered trademark of RightScale, Inc. All other products and services may be trademarks or servicemarks of their respective owners.