WOLFRAM is the kernel component in charge of linking the node with other nodes in the cluster.

The cluster entity

At the core of its responsibilities is the maintenance of the cluster's real-time connectivity, using the configuration stored in the cluster entity, which is explained in the Administrator's View page.

This entity's state is replicated to every node (through the TUNGSTEN replication mechanisms discussed below), although its actual storage isn't in the TUNGSTEN store like other entities - it, along with the special node entity, are stored in a bootstrap configuration area on the node, provided by HYDROGEN.

The cluster entity is created automatically when a cluster is first bootstrapped, and contains global configuration and management information used across the cluster by WOLFRAM.

In-cluster communications

WOLFRAM is in charge of all communication within the cluster, over an iridium transport using a default WOLFRAM UDP port unless configured otherwise. It does this using the list of nodes and the network layout hints found in the cluster configuration. It will attempt to use the "best" link between two nodes, according to the communication groups that the nodes share, but it will also be mindful of available bandwidth within those groups and will attempt to avoid over-committing.

It is responsible for encrypting and authenticating in-cluster communications, except when using sufficiently trusted links that it is deemed unnecessary. This encryption and authentication is performed using the volume's symmetric key for intra-volume communications, or using the destination node's volume's public key stored in the list of volumes (remembering that every node has a corresponding node volume, and every volume has a keypair, the public part of which is listed in the cluster configuration).

The underlying IRIDIUM link is used by WOLFRAM itself, and also made available to MERCURY for in-cluster communications.

Distributed transactions

Distributed transactions are available across the cluster. WOLFRAM provides the ability to create, commit, and abort transactions.

Integral to the distributed transaction system is the provision of Lamport timestamps, for assigning a causal order to events and for creating cluster-global unique IDs, in both cases appending the node ID to the lamport timestamp to disambiguate simultaneous events.

Distributed TUNGSTEN

WOLFRAM provides distributed access to the local TUNGSTEN storage on each node. By referring to the cluster configuration and state stored within volumes, it can obtain the list of nodes which carry replicas of any given entity, and cross-reference that with node liveness information from the in-cluster IRIDIUM links to know which nodes would be good candidates to access entity state from.

Lamport timestamps are used to track the last modification to any given replicated TUNGSTEN tuple, disambiguated by the modifying node index, in the special area for WOLFRAM metadata reserved by TUNGSTEN.

Updates are specified as tuple assertions; such representation considerations as compound objects are handled purely locally on the node.

WOLFRAM replicates updates to all the TUNGSTEN replicas of that entity upon transaction commit, but it has to skip replicas that are currently unreachable. However, it keeps a log of update transactions that have not been fully replicated, and upon seeing the missing node again, replays them.

Also, a transaction may be flagged to "commit asynchronously", in which case it is simply replicated to every node without a distributed commit, meaning that it may appear on different nodes at different points in time.

The distributed storage system cooperates closely with NITROGEN to manage the overall state of the node.

Distributed data model

The use of distributed transactions means that, in any given group of currently-interconnected nodes, the shared state of a replicated entity will appear strongly consistent (except when asynchronous commits are used). However, the presence of link failures may cause transactions to arrive asynchronously even when synchronous commits were requested, so this can never be relied upon.

CARBON updates are always in the form of a new tuple to add. In effect, the TUNGSTEN storage in a section only ever grows, adding new tuples, with one exception: CARBON allows "negative tuples" of the form [c:not tuple], which cancel out corresponding "positive tuples". It also allows "negative rules" of the form [c:not [c:rule tuple]], too.

But how do we define "cancel out" in a distributed system with weak timing guarantees? That depends on a notion of what came before. Thankfully, we can provide that with the Lamport modification timestamps attached to tuples, which provide a global ordering on events that is consistent from all frames of reference.

As such, cancellation updates need to be stored in the local TUNGSTEN storage, so that they can correctly cancel out updates that arrive later but have an earlier timestamp; however, they can be marked as temporary records in TUNGSTEN so that they disappear after a reasonable amount of time chosen to be sufficient for all disconnected nodes to have had a chance to reconnect.

Compound objects are handled below the level of individual updates, and so need to maintain their own update timestamps for their component parts, and will generally have their own "override rules" for contradicting tuples. For instance, an array with individually updatable elements will need to store the update timestamp of every element of the array, so that only the "latest" update asserting the content of an element is kept.

Applications need to be written using suitable idioms to work well with the distributed state model. For instance, storing a counter in a tuple and emitting update transactions of the form ([c:not [counter-value 0]] [counter-value 1]) would be a bad idea, as two concurrent updates to the same counter would produce exactly the same update transaction, resulting in the counter ending up as 1 rather than 2. Instead, it would be better to log individual events as tuples of their own, and then add them up to find out how many have happened. Of course, that wouldn't scale well if done with plain tuples; but compound objects are capable of handling such an interface with an efficient implementation "under the hood".

Content-addressible storage

TUNGSTEN also provides content-addressible storage on a per-node basis, and WOLFRAM uses that to provide a single logical CAS across the entire cluster, by having nodes gossip to each other about what CAS objects they hold. When a node needs a CAS object it doesn't hold locally, WOLFRAM will obtain it from another node, and (if space permits) will store a copy on the local node so that objects are automatically lazily replicated to where they're needed. CAS objects may also be tagged as needing a certain replication level, in which case WOLFRAM is responsible for eagerly replicating that object if required.

TUNGSTEN will delete CAS objects when their expiry time passes, or if disk space is low and they have a drop cost configured; WOLFRAM is responsible for dealing with multiple requests to store the same CAS object with different expiry/drop settings and ensuring that the actual replicas in TUNGSTEN have appropriate settings to match the requirements of all references to the CAS object (for instance, if it's stored with a drop priority by reference A, with an expiry time by reference B, and with neither by reference C, it must be stored without a drop priority or expiry time so that C's reference won't be broken by dropping or expiry).

The WOLFRAM metadata for CAS objects also includes a list of security classifications that the CAS object exists within. Callers may only access the CAS object if they possess suitable clearance. It won't replicate CAS objects to nodes that are not cleared for them, either.

WOLFRAM's most challenging task in managing the CAS is garbage collection. When references to CAS objects are explicitly deallocated via the WOLFRAM API, it must ensure that once there are no remaining references to a CAS object, it is eventually deleted in all the copies in TUNGSTEN. I think explicit reference counts will do for now, with objects with a zero refcount given a low drop cost so they'll be cleared up when the space is needed - but managing refcounts in a distributed system with failure without making mistakes requires care.

Cluster reconfiguration

It is also responsible for ensuring that each entity is sufficiently replicated within the volume, and that the entities are distributed evenly between the nodes mirroring a volume.

When an entity is created, it chooses sufficient nodes in the volume containing the entity to meet the replication factor requirements, prioritising nodes with fewer bytes of entities (from that volume) currently mirrored on them.

When an entity is deleted, if the nodes that mirrored it are now significantly underloaded compared to the others in terms of bytes of entities in that volume, then some entities may be relocated from more-loaded nodes.

When a node is added, it will shift some entity replicas onto that node to reduce the load on the other nodes.

When a node is removed, any entities that are now replicated to fewer nodes than the volume's replication factor will be copied to additional nodes within the volume, to restore the replication factor.

Relocating or copying an entity mirror is done by listing the entity as being mirrored on the target node, but with a marker stating that it is being transferred there, so that it should receive updates but not be queried there; this ensures that no updates are missed during the migration process. When it is complete it is upgraded to a full replica, and removed from the source node (unless this is a copy operation). If any other nodes are accessing that entity from the source node, then the node will return an error indicating that the entity is not available on that node, and the other nodes will re-locate it automatically.

Cluster reconfigurations are requested through administrative interfaces on the cluster entity itself. When a new node is added, a bootstrap configuration object is generated containing volume keys and the node's ID, which must be given to the new node in order to bootstrap itself into the cluster.

Entity management

An entity can be deleted. All replicas of its TUNGSTEN state are freed.

Entities may be created by making a request to an existing entity's admin interface to create the new entity in the same volume. The target entity is not actually involved in this process at all, although it may store an ACL to control access to this operation; it exists purely as an anchor to select the volume to create the new entity inside.

Volume management

Entities are organised into volumes, and each TUNGSTEN node mirrors a specified set of volumes. There's a special node-volume for each node that is only mirrored on it, and there's a special cluster volume that is mirrored to every TUNGSTEN node (and contains the cluster entity). However, other volumes can be created manually through an interface on the cluster entity, and mapped to chosen nodes.

Each volume contains a "volume entity", which presents its administrative interface, although it does so with no cooperation from the entity itself (which can be any entity); that interface is provided by the TUNGSTEN kernel module.

Distributed processing

WOLFRAM provides distributed processing facilities to entity code. An entity may request that a specified LITHIUM handler and arguments be used as a distributed job generator. On a chosen set of nodes, WOLFRAM will install a HELIUM job generator callback at a specified priority level. This callback will invoke the specified LITHIUM handler in the context of the invoking entity, to obtain the ID of a second LITHIUM handler and arguments that represent the actual task to perform. This will continue until the original handler stops providing jobs; when all of the jobs have completed, the distributed job as a whole is complete. The job generator system basically implements a lazy list of jobs to do, allowing it to be computed on the fly. And for parallelisable tasks that aren't generated in bulk, it will be possible to submit a single job to be run, as a LITHIUM handler ID and arguments, which may be distributed to a lightly-loaded node in the cluster to be run if the local node is busy.

OPEN QUESTION: Should it be all nodes trusted to handle the entity's classification? Just nodes mirroring the entity? Something inbetween? As the cluster is a global resource, some form of global management may be needed. Perhaps entities should have a processor quota and priority that can be overridden by administrators. Think about how to choose)

OPEN QUESTION: Is it worth having a lighter-weight job generator interface where you provide a local next-job-please closure, and remote nodes call back via WOLFRAM to ask for new jobs? Actually distributing job generation (causing the job generator state to be synchronised at the TUNGSTEN level) might be too heavyweight.

Distributed caching

WOLFRAM also provides a distributed cache mechanism.

There are several caches in the system. As well as one cache created implicitly per node, which is stored only on that node, and a volume cache that covers all nodes hosting a volume, there may be other caches covering a specified set of nodes in the cluster configuration.

Cached objects have the usual TUNGSTEN temporary storage metadata - an expiry time and a drop cost to allow automatic removal when space is short; and in addition, if the drop cost is not specified (meaning "do not automatically delete"), the cache item can be given a replication factor, indicating how many nodes in the cache it should be replicated to for safety. Objects are allocated to persistent or transient storage depending on whether they have a drop priority, and whether storage of the ideally desired type is available.

The cached objects on each node are stored in a combination of TUNGSTEN (if the node has mass storage), using global temporary storage records, and in-memory storage; the storage tag specifies the cache identifier (to keep them disjoint) and the cache tag of the object.

Each node within a given cache will have its own cache limits for persistent (TUNGSTEN) and transient (RAM) storage; the allocation of cached objects uses these as weightings.

OPEN QUESTION: We know we want caching for system stuff like fetching data from CARBON, and it would be nice to provide caches to arbitrary entity code, but how to connect the requirements of entity code - which doesn't know the cluster topology - to a cache setup? Do we just provide a default cache per volume that entities in that volume use, and allow administrators to identify entities that need their caching behaviour "tweaked" and create an explicit cache for them?