Nikita Danilov
2008-Dec-22 07:53 UTC
[Lustre-devel] global epochs [an alternative proposal, long and dry].
Hello, a few proposals for a distributed recovery for the upcoming CMD release of Lustre were discussed recently. In my opinion, many of them (including the clients-epoch approach that I advocated) are very complex, and a simpler solution, that can be fully understood is needed. The following is an attempt to provide such a solution. Nikita. * * * This is a strawman proposal. At least it would help us to settle the terminology. The following describes an alternative distributed recovery mechanism. As this proposal is somewhat radically alternative, exposition is rather abstract, please bear with it. The summary is that the original `global epochs'' [10] proposal is modified to involve all cluster nodes, including clients. This seeks to fix what is seen as a major drawback of the said algorithm: its reliance on a master-slave processing. Definitions: =========== A _cluster_ consists of _nodes_. Every node has a volatile storage. Some nodes have persistent storage. Persistent means `surviving any failure considered in the model''. Nodes exchange _messages_. A message X with a parameter Y, sent from a node N to a node M is denoted as X(Y) : N -> M Synchronous message send is denoted as X(Y) : N ->> M It is, in reality, a sequence X(Y) : N -> M RepX : M -> N of a send and a reply. Nodes _join_ the cluster, and _part_ from the cluster. A node can be forcibly parted from the cluster---_evicted_. An _operation_ is a `primitive'' distributed modification of state, that moves distributed state from one consistent state to another consistent state. `Primitive'' because without such a qualification a valid sequence of operations would be an operation itself. An operation OP consists of _updates_ of a state of every node involved in this operation: OP = (U(0), ... U(n)), where U(k) is an update for a node U(k).node. A _reintegration_ of an operation is a process by which a node (by sending messages) requests other nodes to _execute_ updates of a given operation, i.e., to effect corresponding state change in the node storage (volatile or persistent). Details of reintegration are described below. A node with a persistent storage supports _transactions_, which are means to declare that a sequence of updates, executed in a volatile storage, must reach persistent storage atomically. Two updates are _conflicting_ if their results (including success or failure indication) and the final state are depending on the order of their execution. For a given update U, a node N can send a message to U.node, requesting a _lock_ that will delay requests for locks for conflicting updates requested from other nodes until the lock is either released by another message or when N leaves the cluster. (In reality locks are taken on objects, but introducing them would complicate the exposition.) Epoch Basics: ============ The core epochs algorithm is very simple. Every node N keeps in its volatile storage an _epoch number_, denoted N.epoch. Every message X is tagged with an epoch number that is denoted as X.epoch. These epoch numbers are maintained according to the following protocol: E1. On receiving X : M -> N, N sets N.epoch = max(N.epoch, X.epoch); E2. On sending X : N -> M, N sets X.epoch = N.epoch; Assignments in E1 and E2 must be mutually atomic. Compare this with `Lamport timestamps'' [1] and `vector clocks'' [2]. Progressing toward new epochs will be described later, for now assume that there are multiple epoch numbers at the same time stored in the node memories and traversing the network in messages. Operations: ========== O1. To reintegrate an operation OP = (U(0), ... U(n)), a node N - sends lock requests: LOCK(U(k)) : N ->> U(k).node; - sends reintegration messages: REINT(U(k)) : N -> U(k).node atomically w.r.t. E1. - adds U to the volatile `redo log''. O1 doesn''t require all LOCK messages to be synchronous and serialized: it''s only necessary that replies to all LOCK messages are received before first REINT message is sent. We denote REINT(U).epoch as U.epoch (well-defined), and say that update U `is in the epoch U.epoch'', and that corresponding undo record (see O2) is a record `in epoch U.epoch''. O2. On receiving REINT(U) : M -> N (where N == U.node), node N transactionally - executes U in the volatile storage, and - adds to the `undo log'' a record [U, OP] Note that U.epoch can be less than N.epoch at the time of execution (it cannot be greater than the latter due to E1). We consider only single-level reintegration, where execution of an update requires no further reintegrations. Generalization to the multi-level case is left as an exercise for a curious reader. Correctness: =========== We can now prove a number of very simple statements: S0: For a node N, N.epoch increases monotonically in time. Proof: The only place where N.epoch is modified is E1, and this is obviously a non-decreasing function. S1: A collection of all updates in a given epoch is presicely a collection of updates for some set of operations (i.e., epoch contains no partial operations). Proof: Obvious from O1: all updates for a given operation are sent in the same epoch. S2: For any sequence of conflicting updates (U{0}, ... U{n}), the sequence (U{0}.epoch, ..., U{n}.epoch) is monotonically increasing. Proof: Consider conflicting updates U{k} and U{k+1}. From O1 and the definition of locking it is immediately clear that the following sequence of message sends took place: LOCK(U{k}) : N -> S ; request a lock for U{k} RepLOCK(U{k}) : S -> N ; get the lock for U{k} (*) REINT(U{k}) : N -> S ; reintegrate U{k} LOCK(U{k+1}) : M -> S ; conflicting lock is requested by M (*) UNLOCK(U{k}) : N -> S ; N yields the lock (*) RepLOCK(U{k+1}) : S -> M ; M get the lock (*) REINT(U{k+1}) : M -> S ; reintegrate U{k+1} Only ordering of messages marked with (*) matters, the rest is just for completeness. Then U{k}.epoch == REINT(U{k}).epoch ; by definition <= UNLOCK(U{k}).epoch ; by S0 for N and E2 <= RepLOCK(U{k+1}) ; by S0 for S and E2 <= REINT(U{k+1}) ; by S0 for M == U{k+1}.epoch ; by definition In the essence, S2 states that epoch ordering is compatible with the causal ordering of updates. An important consequence of this is that an epoch cannot `depend'' on a previous epoch. Note that the proof of S2 is very similar to the proof of serializability [7] of the database schedules under the two-phase locking (2PL) protocol [3].>From S0, S1 and S2 it seems very plausible to conclude thatS3: For any epoch E, a collection of updates in all epochs up to and including E is presicely a collection of updates in some prefix of execution history. That is, for every node N, said collection contains updates from all operations reintegrated by N before some moment T in N''s physical time, and no updates from operations reintegrated by N after T. Alternatively, `an epoch boundary is a consistent state snapshot''. We won''t prove S3, as this requires formalizing the notions of global and local histories, distributed schedules, etc., which is more formalism than is tolerable at the moment. Intermezzo: ========== S3 is the main weapon in achieving correct distributed recovery: it claims that restoring the distributed state as of on an epoch boundary results in a globally consistent state. The key observation is that due to O2 every node with a persistent storage has enough information to individually restore its state to the boundary of _any_ epoch, all updates from which it has on its persistent storage, even in the face of failures. Once all such nodes agreed on a common epoch number, they restore their state independently. It is this agreeing on a single number instead of agreeing on a common set of updates that greatly simplifies recovery. Advancing epochs: ================ So far no way to progress to the next epoch was introduced. If algorithms described above were ran as is, there would be only one epoch boundary: an initial file system state (as created by mkfs), and it would be the only point to which epoch-based recovery could restore the system up to. A switch to the next epoch can be initiated by any node N, and is effected by E3. N.epoch++; That''s all. That is, multiple nodes can advance epochs completely independently without any communication whatsoever. To understand why this is sound recall the proof of S3: all it relies on is that epochs monotonically increase across a chain _dependent_ messages, and to be involved into dependent operation nodes communicate (through another node perhaps), and their epoch numbers are synchronized by E1 and E2. E3 is executed atomically w.r.t. E1 and E2. Note that E3 doesn''t break epoch monotonicity assumed by S0. To speed up announcement of a new epoch, N E4. (optionally) sends null messages to some nodes. The more, if any, null messages are sent to other nodes, the faster news about new epoch are spread across the cluster. In the extreme case, N broadcasts announcement to the whole cluster. Note that there is no synchrony requirements for the null messages: it is perfectly valid, for example, that N is still sending them when another node already started sending the next round of announcements. There is a great laxity in deciding when to switch to the next epoch. Possible variants include: on every reintegration (an extreme case), on a timeout, on a certain amount of updates in the existing epoch, etc. Similarly it''s a matter of policy to allow all or only select nodes to advance epochs. Retiring epochs: =============== The description above outlines, in principle, a workable system, on top of which distributed recovery can be implemented. Yet in O2 a flaw is hidden: `undo log'' can only grow, and no way to limit its size is indicated. While from some points of view (audit, undelete, backup) a complete log of system updates from the beginning of time is useful, it is generally unacceptable to keep an O(operations) rather than O(data + metadata) state on the persistent storage. To this end a mechanism to prune undo log without sacrificing correctness is necessary. Clearly, an entry can be pruned from an undo log the moment it is guaranteed that the corresponding update will be never undone as a part of restoring consistent state during recovery. As our goal is to restore to the epoch boundary, all undo entries for a given epoch are discardable if one of them is. To understand what epochs can be retired, let''s look at the epochs that can be not. Obviously, an epoch cannot be discarded from an undo log if some of its updates are in volatile storage only: if nodes with these volatile updates fail, epoch can never be completed, and has to be undone completely.>From this it is tempting to conclude that an epoch can be pruned from undologs once all of its updates are on the persistent storage, but, welladay [4], this is no so, because even as a given epoch can be everywhere stabilized to the persistent storage, some of its preceding epochs can be still volatile. This in fact is the only obstacle: an epoch can be pruned from undo logs as soon as it and all preceding epochs are everywhere stabilized. Note that because epochs are advanced independently, updates for a given epoch can be spread across all nodes, and the only safe way to learn about everywhere stable epochs is to ask every node in the cluster what is the oldest epoch for which it has updates in the volatile storage only. Finding out everywhere stable epochs can be done across various `topologies'': star, ring, etc. [6] We shall discuss the simplest star model, but see below. Every node N maintains an `oldest locally volatile epoch'' N.lvepoch, defined as an earliest epoch that still has on this node updates in the volatile memory only. For a server node, N.lvepoch is an epoch of the earliest update that was executed, but hasn''t yet committed to the persistent storage. For a client node, N.lvepoch is an epoch of the earliest reintegration that has at least one update that hasn''t been committed to the stable storage on the corresponding server. Note that N.lvepoch does _not_ necessary increase monotonically with time, as a node can receive, as a part of reintegration, updates with an epoch smaller than any epoch it seen before. The following however holds: S4: For any node N, N.lvepoch <= N.epoch, at any time. Proof: if N received an update U as a part of reintegration, N.epoch was updated by E1 as part of REINT(U) processing, assuring that U.epoch < N.epoch. If U originates on N, it is tagged with the current node epoch, so U.epoch == N.epoch. Since that moment, N.epoch continues to increase monotonically, guaranteeing that U.epoch <= N.epoch for any volatile (or stable, for that matter) update U on N. Therefore, N.lvepoch == min{U.epoch | volatile update U at N} <= N.epoch; A node SC (Stability Coordinator) is selected in the cluster configuration. SC monitors - cluster membership: every node N sends HAS_VOLATILE(N, N.epoch) : N ->> SC when it enters the cluster (where N.epoch is set up during the initial hand-shake with the cluster entry node) and HAS_VOLATILE(N, +infinity) : N ->> SC when it parts from the cluster. When a node N is evicted by a node M, a HAS_VOLATILE(N, +infinity) : M ->> SC is send. - an oldest locally volatile epoch for every node as an array SC.lvepoch[]. These data are updated as following: E5. Periodically every node N sends HAS_VOLATILE(N, N.lvepoch) : N -> SC. E6. On receiving HAS_VOLATILE(N, lvepoch) : M -> SC, SC sets SC.lvepoch[N] = lvepoch; E7. When min{SC.lvepoch[*]} changes, SC broadcasts MIN_VOLATILE(min{SC.lvepoch[*]}) : SC -> N to every node N. Protocol E5--E7 implements a `stability algorithm''. Clearly, stability algorithm aligns very well with the tree reduction [6]: in a typical cluster clients will report their oldest volatile epochs to the servers, that would compute minimum across their children and forward it upward until the root node is reached, from where the global minimum is propagated back. S5: When a node N receives MIN_VOLATILE(E) : SC -> N it can safely prune all epochs older than E from its undo logs. Proof: Imagine that some epoch earlier than E is somewhere volatile at the moment when N receives the message above, that is some node M has volatile update U such that U.epoch < E. Let Q be the node that originated U (this might be M or some other node). We have the following sequence of messages: HAS_VOLATILE(Q, lvepoch) : X -> SC MIN_VOLATILE(E) : SC -> N where the last such stability message from Q is meant. If X != Q, that is, Q has been evicted by a certain node X, then, by the definition of the eviction process (see below), the following sequence of messages took place: HAS_VOLATILE(M, M.lvepoch) : M ->> SC HAS_VOLATILE(Q, +infinity) : X ->> SC MIN_VOLATILE(E) : SC -> N and we have U.epoch >= M.lvepoch ; by the definition of M.lvepoch >= min{SC.lvepoch[*]} ; by E6 == E ; by E7. Contradiction. Hence, X == Q, and Q hasn''t been evicted. If lvepoch > U.epoch, then by the definition of Q.lvepoch, Q has been informed by the servers (including M), that all updates in U.epoch, including U have been stabilized, which contradicts the initial assumption about U. Hence, U.epoch >= Q.lvepoch >= min{SC.lvepoch[*]} ; by E6 == E ; by E7. Contradiction. Redo logs: ========= The problem of pruning redo logs, filled by O1 is much simpler: once a record for an update U is discarded from the undo log, corresponding record can be discarded from the redo log too, because if record is never undone, there will never be a chance to redo it. This policy is conservative, because redo logs can be pruned much more aggressively, yet, it is simple, and all infrastructure for it already exists. Recovery: ======== Let''s after all describe how the recovery process looks like. There are two types of recovery: - eviction: it happens when a node without persistent storage fails. In this case, some other node takes the task of restoring consistent state, and - recovery (proper), that happens when a node with persistent storage fails. In this case failed node initiates distributed recovery algorithm when it restarts. When a node N decides to evict a victim node V, it V1. sends EVICT_PREP(V) : N -> M to all nodes that might potentially keep volatile updates for V (typically, all servers, including N). V2. On receiving EVICT_PREP(V) : N -> M, node M - records that V is evicted, denying all future messages from it, - sends HAS_VOLATILE(N, N.lvepoch) : N ->> SC. - finds all locks it granted to V, - finds in its undo log all records [U, OP] where U is protected by these locks (U.node == V of course), and adds them to the list L, - sends EVICT_ACK(V, L) : M -> N. V3. On receiving EVICT_ACK(V, L) : M -> N from all nodes, N - removes from every L all entries that together match complete operation. - if all L lists are now empty, then all updates from all operations reintegrated by V reached other nodes, and there is nothing to do; - otherwise, some of the updates were lost, and other updates from the same operations have to be undone, which might require undoing yet other updates, including updates not protected by the locks held by V, and updates made by other clients. N initiates proper recovery (see below), that can be started immediately from the step R4, by sending RECOVERY_COMMIT(N, min{U.epoch | U in L}) : N ->> M. V4. In any case, eviction is finished by sending HAS_VOLATILE(V, +infinity) : N ->> SC. This algorithm is `obviously'' correct, as it either - discards V volatile storage in the case when contents of this storage is duplicated on other nodes (thus global state is not changed), or - invokes proper recovery. Many optimizations are possible: - M can omit `local'' operations from L, - tree reduction of L construction, - nodes might force updates to the persistent storage (commit-on-eviction) to reduce the risk of future proper recovery failing due to missing V redo log. The unfortunate fact that eviction might force undoing updates made by other clients and, hence, cascading evictions is a direct consequence of a weaker isolation level implied by O1 and O2, viz. an ability to read data modified by an update that is a part of an operation other updates of which hasn''t reached their target nodes yet. This is similar to `cascading aborts'' that arise due to reads of uncommitted data [8], and can be addressed by a very simple mechanism: O3: For any operation OP1 = (U(0), ... U(n)), a REINT(U(k)) message can be sent only once for any operation OP0 containing an update conflicting with any of U(i), replies to all OP0 reintegration messages have been received. That is, new reintegration can start only after all conflicting reintegrations fully completed, where `conflict'' is understood to be between operations rather than between individual updates. With a modest decrease in reintegration concurrency, introduced by this mechanism that we are going to call `volatile-on-share'' (because on conflict updates are forced to at least volatile storage of their target nodes, compare with commit-on-share [11]), eviction algorithm can be simplified as following: V3''. On receiving EVICT_ACK(V, L) : M -> N from all nodes, N - removes from every L all entries that together match complete operation. - for any non-empty L, N sends EVICT_COMMIT(V, L) : N ->> M V4''. On receiving EVICT_COMMIT(V, L) : N ->> M, node M - undoes all updates in L, with O3 guaranteeing that no conflicting updates exist. - releases all locks found in V2. V5''. N finishes eviction by sending HAS_VOLATILE(V, +infinity) : N ->> SC. Proper recovery. We shall assume, that the transaction system on nodes with persistent storage maintains commit ordering: If transactions T0 and T1 contain conflicting updates U0 and U1, U0 precedes U1 in time and T1 has been committed to the persistent storage, then so is T0. (If a pair of transactions has multiple conflicting updates, they all have to be in the same order, otherwise transactions are not serializable.) The rough recovery plan is to - find out which is the latest everywhere stable epoch (by running stability algorithm described above), - undo all epochs up to the epoch found, and - apply all available redo logs to restore as much state as possible. Some node (presumably a node that failed and restarted) acts as a recovery coordinator (RC). RC maintains `oldest somewhere volatile epoch'' RC.svepoch as described below. The following protocol (two phase commit protocol [9] for RC.svepoch, in fact) is executed: R1. RC sends RECOVERY_PREP(RC) : RC -> N to all nodes with the persistent storage. R2. On receiving RECOVERY_PREP(RC) : RC -> N, node N sends HAS_VOLATILE(N, N.lvepoch) : N -> RC R3. On receiving HAS_VOLATILE(N, lvepoch) : N -> RC, RC sets RC.svepoch = min(RC.svepoch, lvepoch). R4. Once RC received all HAS_VOLATILE messages from all servers, it broadcasts RECOVERY_COMMIT(RC, RC.svepoch) : RC ->> N R5. on receiving RECOVERY_COMMIT(RC, E), node N undoes all updates in has in its logs in all epochs starting from E. Nodes might need to keep some persistent state to guarantee recovery progress in the face of repeated failures, in the standard 2PC fashion. Once undo-part of recovery is finished, clients are asked to push their redo logs, starting from epoch RC.svepoch to servers, before they can start requesting new reintegrations. Usual Lustre algorithm (version based recovery) can be used here, with nodes evicted, when they cannot redo updates due to some other client failure. Belle Epoque: ============ Proposed algorithm has the following advantages: - it is very simple. Surprisingly, with some effort, it even seems to be amendable to a more or less complete formal analysis; - it is non-blocking: in no event `normal processing'' i.e., reintegration has to block waiting for some epoch-related processing; - it is scalable, provided failures are relatively rare; Its main disadvantage is that due to the clients participation in the stabilization algorithm, a failed client can delay detection of everywhere stable epochs, and, hence, lead to larger undo-redo lists, and a longer recovery. It seems that in the worst case, a sequence of client failures can delay detection of epoch stabilization by a timeout times some small constant. References: ========== [1] Lamport timestamps: http://en.wikipedia.org/wiki/Lamport_timestamps [2] Vector clocks: http://en.wikipedia.org/wiki/Vector_clocks [3] Two phase locking: http://en.wikipedia.org/wiki/Two_phase_locking [4] Nurse: Ah, well-a-day! he''s dead, he''s dead, he''s dead! We are undone, lady, we are undone! [5] Network topology: http://en.wikipedia.org/wiki/Network_topology [6] Tree reduction: http://en.wikipedia.org/wiki/Graph_reduction [7] Serializability: http://en.wikipedia.org/wiki/Serializability [8] Recoverability: http://en.wikipedia.org/wiki/Schedule_(computer_science)#Avoids_cascading_aborts_.28rollbacks.29 [9] Two-phase commit: http://en.wikipedia.org/wiki/Two-phase_commit_protocol [10] Cuts: http://arch.lustre.org/index.php?title=Cuts [11] Commit on share: http://arch.lustre.org/index.php?title=Commit_on_Share
Alex Zhuravlev
2008-Dec-22 11:52 UTC
[Lustre-devel] global epochs [an alternative proposal, long and dry].
Hello, Nikita Danilov wrote:> For a given update U, a node N can send a message to U.node, requesting a > _lock_ that will delay requests for locks for conflicting updates requested > from other nodes until the lock is either released by another message or when > N leaves the cluster. (In reality locks are taken on objects, but introducing > them would complicate the exposition.)I find this relying on explicit request (lock in this case) as a disadvantage: lock can be taken long before reintegration meaning epoch might be pinned for long pinning in turn a lot of undo/redo logs. It''s also not very clear how fsync(2) and similar requests will be working with such pinned epochs - broadcast to release or change on-client epoch? Another point is that some operation can be lockless: for example, we were planning to skip extents locking for exclusively open files while epoch could be used by SNS for data.> Every node N maintains an `oldest locally volatile epoch'' N.lvepoch, defined > as an earliest epoch that still has on this node updates in the volatile > memory only. > For a client node, N.lvepoch is an epoch of the earliest reintegration that > has at least one update that hasn''t been committed to the stable storage on > the corresponding server.this means client actually should maintain many epochs at same time as any lock enqueue can advance epoch.> A node SC (Stability Coordinator) is selected in the cluster configuration. SC > monitorsI think having SC is also drawback: 1) choosing such node is additional complexity and delay 2) failing of such node would need global resend of states 3) many unrelated nodes can get stuck due to large redo logs> These data are updated as following: > > E5. Periodically every node N sends > > HAS_VOLATILE(N, N.lvepoch) : N -> SC. > > E6. On receiving HAS_VOLATILE(N, lvepoch) : M -> SC, SC sets > > SC.lvepoch[N] = lvepoch; > > E7. When min{SC.lvepoch[*]} changes, SC broadcasts > > MIN_VOLATILE(min{SC.lvepoch[*]}) : SC -> N > > to every node N. > > Protocol E5--E7 implements a `stability algorithm''.given current epoch can be advanced by lock enqueue, client can get many used epochs at same time, thus we''d have to track them all in the protocol.> Clearly, stability algorithm aligns very well with the tree reduction [6]: in > a typical cluster clients will report their oldest volatile epochs to the > servers, that would compute minimum across their children and forward it > upward until the root node is reached, from where the global minimum is > propagated back.I''m not sure it scales well as any failed node may cause global stuck in undo/redo pruning.> Redo logs: > =========> > The problem of pruning redo logs, filled by O1 is much simpler: once a record > for an update U is discarded from the undo log, corresponding record can be > discarded from the redo log too, because if record is never undone, there will > never be a chance to redo it. This policy is conservative, because redo logs > can be pruned much more aggressively, yet, it is simple, and all > infrastructure for it already exists.it''s probably simpler, but single node suffers from this global dependency much: there might be a lot of epochs under work and lots of RPCs (especially with tree reduction) before client can discard redo. I don''t think this really scales well with 100K and more nodes.> Proper recovery. > The rough recovery plan is to > > - find out which is the latest everywhere stable epoch (by running > stability algorithm described above),It''s not very clear how server finds epoch stable in case of total power off: no client can provide this data.> > - undo all epochs up to the epoch found, and > > - apply all available redo logs to restore as much state as possible. > > Some node (presumably a node that failed and restarted) acts as a recovery > coordinator (RC). RC maintains `oldest somewhere volatile epoch'' RC.svepoch as > described below. > > The following protocol (two phase commit protocol [9] for RC.svepoch, in fact) > is executed: > > R1. RC sends RECOVERY_PREP(RC) : RC -> N to all nodes with the > persistent storage. > > R2. On receiving RECOVERY_PREP(RC) : RC -> N, node N sends > > HAS_VOLATILE(N, N.lvepoch) : N -> RC > > R3. On receiving HAS_VOLATILE(N, lvepoch) : N -> RC, RC sets > > RC.svepoch = min(RC.svepoch, lvepoch). > > R4. Once RC received all HAS_VOLATILE messages from all servers, it > broadcasts > > RECOVERY_COMMIT(RC, RC.svepoch) : RC ->> N > > R5. on receiving RECOVERY_COMMIT(RC, E), node N undoes all updates in > has in its logs in all epochs starting from E. > > Nodes might need to keep some persistent state to guarantee recovery progress > in the face of repeated failures, in the standard 2PC fashion. > > Once undo-part of recovery is finished, clients are asked to push their redo > logs, starting from epoch RC.svepoch to servers, before they can start > requesting new reintegrations. Usual Lustre algorithm (version based recovery) > can be used here, with nodes evicted, when they cannot redo updates due to > some other client failure. >while one may find this simple I think we shouldn''t sacrifice scalability and performance for simplicity. Instead we could do the following: * when client issues transaction it labels it with unique id * server executing operation write atomically undo record with: * VBR versions so that we can build chains of really depended operations * unique transaction id generated by client * number of servers involved in transaction * periodically servers exchange their committed unique transaction ids (only distributed transaction are involved in this) * once some distributed transaction is committed on all involved servers, we can prune it and all its local successors * during recovery: * first, all capable clients replay they redo (replay queue) * servers read their undo logs, find distributed transactions * servers exchange their distributed transaction ids * servers find partially committed distributed transactions * servers undo partially committed distributed transactions and all depending on them I see the following advantages of this dependency-based approach: * only servers are involved * no single point of failure that may cause many nodes to block due to large redo logs * client doesn''t need to track many global epochs - just use current mechanism, no changes on the client side * no rely on some protocol like ldlm * support for lockless operations * with late replay we don''t need to update redo with some new epoch * doesn''t depend on current cluster state: * can forward transactions via intermediate nodes * may be important for complex setups over WAN * fsync(2) and synchronous requests can be implemented optimal way * support for global and local epochs with no additional code * amount of network overhead is proportional to number of distributed transactions: * server just needs to send arrays of transaction ids to other servers * much better batching compared to the above * with 32K distributed transaction per second and 16byte unique transaction id, server nwould need to send ~2,5MB per 5 second * if server is told what other transaction''s participant, then this exchange can be very efficient * no need in undo for non-depended changes: * in the simplest form - no uncommitted distributed transaction in undo before * in the complex form - tracking real dependency at executime time * it means in many cases recovery can be very fast * recovery can be completed quickly as undo is smaller and we undo-redo very selectively * as long as no distributed transactions are issued (A works with /home/a living on mds1, B works with /home/b living on mds2) no any epoch-related activity is required, including undo thanks, Alex
Nikita Danilov
2008-Dec-22 12:45 UTC
[Lustre-devel] global epochs [an alternative proposal, long and dry].
Alex Zhuravlev writes: > Hello, Hello, > > Nikita Danilov wrote: > > For a given update U, a node N can send a message to U.node, requesting a > > _lock_ that will delay requests for locks for conflicting updates requested > > from other nodes until the lock is either released by another message or when > > N leaves the cluster. (In reality locks are taken on objects, but introducing > > them would complicate the exposition.) > > I find this relying on explicit request (lock in this case) as a disadvantage: > lock can be taken long before reintegration meaning epoch might be pinned for Hm.. a lock doesn''t pin an epoch in any way. > long pinning in turn a lot of undo/redo logs. It''s also not very clear how fsync(2) > and similar requests will be working with such pinned epochs - broadcast to release > or change on-client epoch? Another point is that some operation can be lockless: > for example, we were planning to skip extents locking for exclusively open files > while epoch could be used by SNS for data. Locks are only needed to make proof of S2 possible. Once lockless operation or SNS guarantee in some domain-specific way that no epoch can depend on a future one, we are fine. > > > Every node N maintains an `oldest locally volatile epoch'' N.lvepoch, defined > > as an earliest epoch that still has on this node updates in the volatile > > memory only. > > For a client node, N.lvepoch is an epoch of the earliest reintegration that > > has at least one update that hasn''t been committed to the stable storage on > > the corresponding server. > > this means client actually should maintain many epochs at same time as any lock > enqueue can advance epoch. I don''t understand what is meant by "maintaining an epoch" here. Epoch is just a number. Surely a client will keep in its memory (in the redo log) a list of updates tagged by multiple epochs, but I don''t see any problem with this. > > > A node SC (Stability Coordinator) is selected in the cluster configuration. SC > > monitors > > I think having SC is also drawback: > 1) choosing such node is additional complexity and delay > 2) failing of such node would need global resend of states > 3) many unrelated nodes can get stuck due to large redo logs As I pointed out, only the simplest `1-level star'' form of a stability algorithm was described for simplicity. This algorithms is amendable to a lot of optimization, because it, in effect, has to find a running minimum in a distributed array, and this can be done in a scalable way: Clearly, stability algorithm aligns very well with the tree reduction [6]: in a typical cluster clients will report their oldest volatile epochs to the servers, that would compute minimum across their children and forward it upward until the root node is reached, from where the global minimum is propagated back. Note, that this requires _no_ additional rpcs from the clients. > > > > Protocol E5--E7 implements a `stability algorithm''. > > given current epoch can be advanced by lock enqueue, client can get many used > epochs at same time, thus we''d have to track them all in the protocol. I am not sure I understand this. _Any_ message (including lock enqueue, REINT, MIN_VOLATILE, CONNECT, EVICT, etc.) potentially updates the epoch of a receiving node. > > > Clearly, stability algorithm aligns very well with the tree reduction [6]: in > > a typical cluster clients will report their oldest volatile epochs to the > > servers, that would compute minimum across their children and forward it > > upward until the root node is reached, from where the global minimum is > > propagated back. > > I''m not sure it scales well as any failed node may cause global stuck in undo/redo > pruning. Only until this node is evicted, and I think that no matter what is the pattern of failures, a single level of `tree reduction'', can be delayed by no more than a single eviction timeout. > > > Redo logs: > > ========= > > > > The problem of pruning redo logs, filled by O1 is much simpler: once a record > > for an update U is discarded from the undo log, corresponding record can be > > discarded from the redo log too, because if record is never undone, there will > > never be a chance to redo it. This policy is conservative, because redo logs > > can be pruned much more aggressively, yet, it is simple, and all > > infrastructure for it already exists. > > it''s probably simpler, but single node suffers from this global dependency much: > there might be a lot of epochs under work and lots of RPCs (especially with tree > reduction) before client can discard redo. I don''t think this really scales well > with 100K and more nodes. Actually, single-server operation can be discarded from a redo log as soon as it commits on the target server, because the later can always redo it (possibly after undo). Given that majority of operations are single server, redo logs won''t be much larger than they are to-day. > > > Proper recovery. > > The rough recovery plan is to > > > > - find out which is the latest everywhere stable epoch (by running > > stability algorithm described above), > > It''s not very clear how server finds epoch stable in case of total power off: > no client can provide this data. The `marker'' of a last everywhere stable epoch is the end of undo log---when a server receives a MIN_VOLATILE message, it prunes all everywhere stable epochs from its undo log, so on recovery servers simply exchange the oldest epochs in their logs, and find youngest of them. All the epochs before this one are everywhere stable (because at least one server pruned them from its undo logs and hence it received a MIN_VOLATILE message authorizing it to do so), and every server can roll back to it. So, rolling all servers back to this epoch is possible and restores a consistent snapshot. Nikita.
Alexander Zarochentsev
2008-Dec-22 13:48 UTC
[Lustre-devel] global epochs [an alternative proposal, long and dry].
On 22 December 2008 15:45:51 Nikita Danilov wrote:> Alex Zhuravlev writes: > > Hello, > > > I''m not sure it scales well as any failed node may cause global > > stuck in undo/redo pruning. > > Only until this node is evicted, and I think that no matter what is > the pattern of failures, a single level of `tree reduction'', can be > delayed by no more than a single eviction timeout.It introduces unneeded dependency between nodes, any node cannot prune its own undo logs if all nodes have an agreement that the epoch can be pruned. IMO it is what scalable system should avoid. If we would have a disaster in a part of the cluster, client nodes would disconnect and reconnect often, the undo logs will be overloaded, and the cluster will stop, no? Thanks, -- Alexander "Zam" Zarochentsev Staff Engineer Lustre Group, Sun Microsystems
Nikita Danilov
2008-Dec-22 14:21 UTC
[Lustre-devel] global epochs [an alternative proposal, long and dry].
Alexander Zarochentsev writes: > On 22 December 2008 15:45:51 Nikita Danilov wrote: > > Alex Zhuravlev writes: > > > Hello, > > > > > I''m not sure it scales well as any failed node may cause global > > > stuck in undo/redo pruning. > > > > Only until this node is evicted, and I think that no matter what is > > the pattern of failures, a single level of `tree reduction'', can be > > delayed by no more than a single eviction timeout. > > It introduces unneeded dependency between nodes, any node cannot prune > its own undo logs if all nodes have an agreement that the epoch can be > pruned. IMO it is what scalable system should avoid. This is a price paid for the cheap introduction of new epochs. If epoch scope is limited to a known group of nodes, then retiring such an epoch requires consensus only between nodes of this group (cheaper than a global consensus), but introduction of new epochs requires coordination between groups. In various designs that we considered where epochs are per-client this manifests itself as an absence of total ordering between epochs that requires translation between client epochs and server transaction identifiers. All in all, I have a feeling that _all_ such algorithms have similar communication overhead for the `usual'' workload. > > If we would have a disaster in a part of the cluster, client nodes would > disconnect and reconnect often, the undo logs will be overloaded, and > the cluster will stop, no? Well it won''t stop, because a node either manages to reconnect in time (in which case it communicates its state to the superior), or it is evicted on a timeout. In any case, stabilization algorithm progresses. Then, I think that even the simplest global epoch based recovery is very challenging to implement. > > Thanks, > -- > Alexander "Zam" Zarochentsev Nikita.
Alex Zhuravlev
2008-Dec-22 14:44 UTC
[Lustre-devel] global epochs [an alternative proposal, long and dry].
Nikita Danilov wrote:> > I find this relying on explicit request (lock in this case) as a disadvantage: > > lock can be taken long before reintegration meaning epoch might be pinned for > > Hm.. a lock doesn''t pin an epoch in any way.well, I think it does as you don''t want to use epoch received few minutes ago with lock. if node is in WBC mode and granted some STL-like lock, then it may be sending few MBs batch every, say, 5 minutes. there might be no interaction between batches. this means client would need to refresh epoch. depending on workload it may happen that client won''t be able to send batch awaiting new epoch or client may refresh epoch with no real batches after that.> Locks are only needed to make proof of S2 possible. Once lockless > operation or SNS guarantee in some domain-specific way that no epoch can > depend on a future one, we are fine.well, I guess "in some domain-specific way" means another complexity.> > this means client actually should maintain many epochs at same time as any lock > > enqueue can advance epoch. > > I don''t understand what is meant by "maintaining an epoch" here. Epoch > is just a number. Surely a client will keep in its memory (in the redo > log) a list of updates tagged by multiple epochs, but I don''t see any > problem with this.the problem is that with out-of-order epochs sent to different servers client can''t use notion of "last_committed" anymore.> > I think having SC is also drawback: > > 1) choosing such node is additional complexity and delay > > 2) failing of such node would need global resend of states > > 3) many unrelated nodes can get stuck due to large redo logs > > As I pointed out, only the simplest `1-level star'' form of a stability > algorithm was described for simplicity. This algorithms is amendable to > a lot of optimization, because it, in effect, has to find a running > minimum in a distributed array, and this can be done in a scalable way:the bad think, IMHO, in all this is that all nodes making decision must understand topology. server should separate epochs from different clients, it''s hard to send batches via some intermediate server/node.> Note, that this requires _no_ additional rpcs from the clients.disagree. at least for distributed operations client has to report non-volatile epoch from time to time. in some cases we can use protocol like ping, in some - not.> > given current epoch can be advanced by lock enqueue, client can get many used > > epochs at same time, thus we''d have to track them all in the protocol. > > I am not sure I understand this. _Any_ message (including lock enqueue, > REINT, MIN_VOLATILE, CONNECT, EVICT, etc.) potentially updates the epoch > of a receiving node.correct, this means client may have many epochs to track. thus no last_committed anymore.> Only until this node is evicted, and I think that no matter what is the > pattern of failures, a single level of `tree reduction'', can be delayed > by no more than a single eviction timeout.the problem is that may affect non-related nodes very easily.> Actually, single-server operation can be discarded from a redo log as > soon as it commits on the target server, because the later can always > redo it (possibly after undo). Given that majority of operations are > single server, redo logs won''t be much larger than they are to-day.undo to redo? even longer recovery? thanks, Alex
Alex Zhuravlev
2008-Dec-22 14:45 UTC
[Lustre-devel] global epochs [an alternative proposal, long and dry].
Nikita Danilov wrote:> All in all, I have a feeling that _all_ such algorithms have similar > communication overhead for the `usual'' workload.not sure what you meant by "such" and "usual", but dependency-based recovery may have zero overhead for "usual local" workload, in terms of network and disk traffic. thanks, Alex
Nikita Danilov
2008-Dec-22 17:15 UTC
[Lustre-devel] global epochs [an alternative proposal, long and dry].
Alex Zhuravlev writes: > Nikita Danilov wrote: > > > I find this relying on explicit request (lock in this case) as a disadvantage: > > > lock can be taken long before reintegration meaning epoch might be pinned for > > > > Hm.. a lock doesn''t pin an epoch in any way. > > well, I think it does as you don''t want to use epoch received few minutes ago with lock. What is the problem with this? > > > Locks are only needed to make proof of S2 possible. Once lockless > > operation or SNS guarantee in some domain-specific way that no epoch can > > depend on a future one, we are fine. > > well, I guess "in some domain-specific way" means another complexity. Any IO mechanism has to guarantee that operations are "serializable", that is, no circular dependencies exist. This is what global epochs need, they don''t depend on DLM per se. > > I don''t understand what is meant by "maintaining an epoch" here. Epoch > > is just a number. Surely a client will keep in its memory (in the redo > > log) a list of updates tagged by multiple epochs, but I don''t see any > > problem with this. > > the problem is that with out-of-order epochs sent to different servers client can''t > use notion of "last_committed" anymore. What do you mean by "out of order" here? > > > > I think having SC is also drawback: > > > 1) choosing such node is additional complexity and delay > > > 2) failing of such node would need global resend of states > > > 3) many unrelated nodes can get stuck due to large redo logs > > > > As I pointed out, only the simplest `1-level star'' form of a stability > > algorithm was described for simplicity. This algorithms is amendable to > > a lot of optimization, because it, in effect, has to find a running > > minimum in a distributed array, and this can be done in a scalable way: > > the bad think, IMHO, in all this is that all nodes making decision must > understand topology. server should separate epochs from different clients, > it''s hard to send batches via some intermediate server/node. Hm.. I would think that this is very easy, thanks to the good properties of the minimum function (associativity, commutativity, etc.): client piggy-backs its earliest volatile epoch to any message it sends to any server, and server batches these data from clients and forwards them to SC. > > > Note, that this requires _no_ additional rpcs from the clients. > > disagree. at least for distributed operations client has to report non-volatile > epoch from time to time. in some cases we can use protocol like ping, in some - not. I agree with this, but I am not sure this is a problem. If client is idle for seconds, pinging is not a big deal. > > > > given current epoch can be advanced by lock enqueue, client can get many used > > > epochs at same time, thus we''d have to track them all in the protocol. > > > > I am not sure I understand this. _Any_ message (including lock enqueue, > > REINT, MIN_VOLATILE, CONNECT, EVICT, etc.) potentially updates the epoch > > of a receiving node. > > correct, this means client may have many epochs to track. thus no last_committed anymore. Presicely the contrary: MIN_VOLATILE message returns something equivalent to the cluster-wide global last_committed. > > undo to redo? even longer recovery? No, redo to undo. :-) > > thanks, Alex Nikita.
Alex Zhuravlev
2008-Dec-22 17:36 UTC
[Lustre-devel] global epochs [an alternative proposal, long and dry].
Nikita Danilov wrote:> > well, I think it does as you don''t want to use epoch received few minutes ago with lock. > > What is the problem with this?the problem is that this epoch may hold lots of other epochs? this may be especially important for fsync(2) or any synchronous request.> Any IO mechanism has to guarantee that operations are "serializable", > that is, no circular dependencies exist. This is what global epochs > need, they don''t depend on DLM per se.global epochs depend on DLM as a transport to refresh epochs. at least the idea, AFAIU, is to use LDLM RPC to carry epoch protocol. otherwise it''d need separate RPC. I''m just saying that there are case, probably important, when such explicit RPC will be needed, probably in nearly-sync manner. I think this is also additional complexity.> > the problem is that with out-of-order epochs sent to different servers client can''t > > use notion of "last_committed" anymore. > > What do you mean by "out of order" here?epoch N+1 can be committed by mds1 before epoch N is committed by mds2. each such epoch is to be tracked separately and "last_committed" can''t be used I think. additional complexity in the protocol.> > the bad think, IMHO, in all this is that all nodes making decision must > > understand topology. server should separate epochs from different clients, > > it''s hard to send batches via some intermediate server/node. > > Hm.. I would think that this is very easy, thanks to the good properties > of the minimum function (associativity, commutativity, etc.): client > piggy-backs its earliest volatile epoch to any message it sends to any > server, and server batches these data from clients and forwards them to > SC.1) if epoch isn''t bound to some node, then it''s also can be hard to push epochs to implement fsync(2) 2) batching means additional delay> I agree with this, but I am not sure this is a problem. If client is > idle for seconds, pinging is not a big deal.I tend to think ping can be a problem at proper scale. I wouldn''t rely on this.> Presicely the contrary: MIN_VOLATILE message returns something > equivalent to the cluster-wide global last_committed.you meant "from sc" direction. but before that client has to track local committness of each epoch to servers. thanks, Alex
Nikita Danilov
2008-Dec-22 18:57 UTC
[Lustre-devel] global epochs [an alternative proposal, long and dry].
Alex Zhuravlev writes: > global epochs depend on DLM as a transport to refresh epochs. at least the idea, AFAIU, > is to use LDLM RPC to carry epoch protocol. otherwise it''d need separate RPC. I''m just Any message is used as a transport for epochs, including any reply from a server. So a typical scenario would be client server epoch = 8 epoch = 9 LOCK ---------------> <-------------- REPLY epoch = 9 <----- some other message with epoch = 10 from somewhere epoch = 10 .... REINT ---------------> <-------------- REPLY epoch = 10 <----- some other message with epoch = 11 from somewhere epoch = 11 REINT ---------------> <-------------- REPLY epoch = 11 etc. Note, that nothing prevents server from increasing its local epoch before replying to every reintegration (this was mentioned in the original document as an "extreme case"). With this policy there is never more than one reintegration on a given client in a given epoch, and we can indeed implement stability algorithm without clients. > saying that there are case, probably important, when such explicit RPC will be needed, > probably in nearly-sync manner. I think this is also additional complexity. DLM plays no special role in the epochs mechanism. All that it is used for is to guarantee that conflicting operations are executed in the proper order (i.e., an epoch of dependent operation is never less than an epoch of an operation it depends on), but this is what DLM is for, and this has be guaranteed anyway. > > > > the problem is that with out-of-order epochs sent to different servers client can''t > > > use notion of "last_committed" anymore. > > > > What do you mean by "out of order" here? > > epoch N+1 can be committed by mds1 before epoch N is committed by mds2. each such > epoch is to be tracked separately and "last_committed" can''t be used I think. last_committed can be and have to be used. When a client reintegrated operation OP = (U(0), ..., U(N)), it counts this operation as `volatile'' until all N servers reported (through the usual last_committed mechanism, as it is used by Lustre currently) that all updates have committed. > > you meant "from sc" direction. but before that client has to track local committness > of each epoch to servers. Yes, and it can use last_committed of each server to do this. > > thanks, Alex Nikita.
Alex Zhuravlev
2008-Dec-23 06:44 UTC
[Lustre-devel] global epochs [an alternative proposal, long and dry].
Nikita Danilov wrote:> Any message is used as a transport for epochs, including any reply > from a server. So a typical scenario would beI agree, but I think there will be cases with no messages at all. like WBC doing flush every few minutes and then going idle. depending on workload this may introduce additional network overhead on any node.> etc. Note, that nothing prevents server from increasing its local epoch > before replying to every reintegration (this was mentioned in the > original document as an "extreme case"). With this policy there is never > more than one reintegration on a given client in a given epoch, and we > can indeed implement stability algorithm without clients.hmm? if it''s client only who''re aware of parts of distributed transaction, how can we?> DLM plays no special role in the epochs mechanism. All that it is used > for is to guarantee that conflicting operations are executed in the > proper order (i.e., an epoch of dependent operation is never less than > an epoch of an operation it depends on), but this is what DLM is for, > and this has be guaranteed anyway.conflict resolution can be delegated to some different mechanism when STL takes place.> last_committed can be and have to be used. When a client reintegrated > operation OP = (U(0), ..., U(N)), it counts this operation as `volatile'' > until all N servers reported (through the usual last_committed > mechanism, as it is used by Lustre currently) that all updates have > committed.yup. at some point I got to think you''re going to use epochs instead of transno in last_committed, which could be a problem. just to list my observations about global epochs: * it''s a problem to implement synchronous operations * network overhead even with local-only changes depending on workload * disk overhead even with local-only changes * SC is a single point of failure with any topology as it''s the only place to find final minimum * tree reduction isn''t obvious thing because client can''t report its minimum to any node, instead tree is rather static thing and any change should be done very carefully. otherwise it''s very easy to lose minimum thanks, Alex
Nikita Danilov
2008-Dec-23 10:00 UTC
[Lustre-devel] global epochs [an alternative proposal, long and dry].
Alex Zhuravlev writes: > Nikita Danilov wrote: > > Any message is used as a transport for epochs, including any reply > > from a server. So a typical scenario would be > > I agree, but I think there will be cases with no messages at all. > like WBC doing flush every few minutes and then going idle. depending > on workload this may introduce additional network overhead on any node. Indeed, but in any such case additional null rpc won''t harm. In fact, no node should sit isolated for minutes with something in its cache, as it can miss a recovery. > > > etc. Note, that nothing prevents server from increasing its local epoch > > before replying to every reintegration (this was mentioned in the > > original document as an "extreme case"). With this policy there is never > > more than one reintegration on a given client in a given epoch, and we > > can indeed implement stability algorithm without clients. > > hmm? if it''s client only who''re aware of parts of distributed transaction, > how can we? If we have no more than 1 reintegration in a given epoch on a given client, then the server that received an OP = (U(0), ..., U(N)) in epoch E from a client, can send to SC a message telling it that this client contains N volatile updates in epoch E, and whenever some server commits one of U''s it sends to SC a message asking it to decrease a counter for this client. Most obvious implementation will batch these notification, i.e., when a server commits a transaction group it notifies SC about all changes in one message. I personally don''t think that is the best approach. > > > DLM plays no special role in the epochs mechanism. All that it is used > > for is to guarantee that conflicting operations are executed in the > > proper order (i.e., an epoch of dependent operation is never less than > > an epoch of an operation it depends on), but this is what DLM is for, > > and this has be guaranteed anyway. > > conflict resolution can be delegated to some different mechanism when STL takes place. Yes, and this mechanism (if it is correct at all) will guarantee that an epoch cannot depend on a future epoch. > > just to list my observations about global epochs: > * it''s a problem to implement synchronous operations > * network overhead even with local-only changes depending on workload > * disk overhead even with local-only changes > * SC is a single point of failure with any topology as it''s the only place to > find final minimum > * tree reduction isn''t obvious thing because client can''t report its minimum > to any node, instead tree is rather static thing and any change should be > done very carefully. otherwise it''s very easy to lose minimum Unfortunately, as far as I know, no other solution was described with a level of detail sufficient to compare. :-) > > thanks, Alex Nikita.
Alex Zhuravlev
2008-Dec-23 10:21 UTC
[Lustre-devel] global epochs [an alternative proposal, long and dry].
Nikita Danilov wrote:> If we have no more than 1 reintegration in a given epoch on a given > client, then the server that received an OP = (U(0), ..., U(N)) in epoch > E from a client, can send to SC a message telling it that this client > contains N volatile updates in epoch E, and whenever some server commits > one of U''s it sends to SC a message asking it to decrease a counter for > this client. Most obvious implementation will batch these notification, > i.e., when a server commits a transaction group it notifies SC about all > changes in one message. I personally don''t think that is the best > approach.essentially this is very similar to dependency-based recovery, but with no it''s advantages and with SC tracking all states and being single point of failure. I think we need more scalable solution.> Yes, and this mechanism (if it is correct at all) will guarantee that an > epoch cannot depend on a future epoch.again, it''s not about dependency, it''s about network overhead of global epochs.> > just to list my observations about global epochs: > > * it''s a problem to implement synchronous operations > > * network overhead even with local-only changes depending on workload > > * disk overhead even with local-only changes > > * SC is a single point of failure with any topology as it''s the only place to > > find final minimum > > * tree reduction isn''t obvious thing because client can''t report its minimum > > to any node, instead tree is rather static thing and any change should be > > done very carefully. otherwise it''s very easy to lose minimum > > Unfortunately, as far as I know, no other solution was described with a > level of detail sufficient to compare. :-)I could say the same about tree reduction, for example ;) dependency-based recovery was discussed with many details I think. and benefits are very clear, IMHO. as well as overall simplicity due to local implementation (compared with implementation involving all nodes in a cluster). thanks, Alex
Nikita Danilov
2008-Dec-23 11:06 UTC
[Lustre-devel] global epochs [an alternative proposal, long and dry].
Alex Zhuravlev writes: > Nikita Danilov wrote: > > If we have no more than 1 reintegration in a given epoch on a given > > client, then the server that received an OP = (U(0), ..., U(N)) in epoch > > E from a client, can send to SC a message telling it that this client > > contains N volatile updates in epoch E, and whenever some server commits > > one of U''s it sends to SC a message asking it to decrease a counter for > > this client. Most obvious implementation will batch these notification, > > i.e., when a server commits a transaction group it notifies SC about all > > changes in one message. I personally don''t think that is the best > > approach. > > essentially this is very similar to dependency-based recovery, but with > no it''s advantages and with SC tracking all states and being single point > of failure. I think we need more scalable solution. We are talking about few megabytes of data in network or in memory. It''s easy to replicate this state. > > > Yes, and this mechanism (if it is correct at all) will guarantee that an > > epoch cannot depend on a future epoch. > > again, it''s not about dependency, it''s about network overhead of global epochs. Again, global epochs do not depend on DLM to propagate epochs. E.g., lockless IO can be implemented without any additional rpcs. > > > > just to list my observations about global epochs: > > > * it''s a problem to implement synchronous operations > > > * network overhead even with local-only changes depending on workload > > > * disk overhead even with local-only changes > > > * SC is a single point of failure with any topology as it''s the only place to > > > find final minimum > > > * tree reduction isn''t obvious thing because client can''t report its minimum > > > to any node, instead tree is rather static thing and any change should be > > > done very carefully. otherwise it''s very easy to lose minimum > > > > Unfortunately, as far as I know, no other solution was described with a > > level of detail sufficient to compare. :-) > > I could say the same about tree reduction, for example ;) Tree reduction is but an optimization. I am pretty convinced that core algorithm works, because this can be proved. > > dependency-based recovery was discussed with many details I think. Let''s see...> * when client issues transaction it labels it with unique id > * server executing operation write atomically undo record with: > * VBR versions so that we can build chains of really depended operations > * unique transaction id generated by client > * number of servers involved in transaction > * periodically servers exchange their committed unique transaction ids > (only distributed transaction are involved in this) > * once some distributed transaction is committed on all involved servers, we can prune > it and all its local successorsEither I am misunderstanding this, or this is not correct, because not only a given operation, but also all operations it depends on have to be committed, and it is not clear how this is determined. One reason I wrote so lengthy a text was that I want to spell out everything explicitly and unambiguously (and obviously failed in the latter, as ensued discussion has shown). Nikita.
Alex Zhuravlev
2008-Dec-23 11:31 UTC
[Lustre-devel] global epochs [an alternative proposal, long and dry].
Nikita Danilov wrote:> We are talking about few megabytes of data in network or in memory. It''s > easy to replicate this state.I disagree - whole state can be distributed over 100K and more nodes and some operations many need all nodes to communicate their state. this is especially problem with lossy network.> Again, global epochs do not depend on DLM to propagate epochs. E.g., > lockless IO can be implemented without any additional rpcs.sorry, I said nothing about DLM. I said "additional RPC", which is required in some cases. ping, for example, can issue RPC once per 60s. more over, ping also can use tree or some different topology making epoch refresh more complex.> Tree reduction is but an optimization. I am pretty convinced that core > algorithm works, because this can be proved.sorry, works doesn''t always mean "meet requirements". in our case scalability is the top one. in this regard I don''t see how this model can work well with synchronous operations. at same time it was stated that we have to support such operations well, e.g. for nfs exports. I also tried to point out onto few overheads in the algorithm.>> * once some distributed transaction is committed on all involved servers, we can prune >> it and all its local successors > > Either I am misunderstanding this, or this is not correct, because not > only a given operation, but also all operations it depends on have to be > committed, and it is not clear how this is determined.the algorithm works starting from oldest operations and discards them when there is no undo before this one.> One reason I wrote so lengthy a text was that I want to spell out > everything explicitly and unambiguously (and obviously failed in the > latter, as ensued discussion has shown).yes, it''s well written and proven thing. the point is different - if it''s clear that in some cases it doesn''t work well (see sync requirement), what the proof does? thanks, Alex
Nikita Danilov
2008-Dec-23 12:50 UTC
[Lustre-devel] global epochs [an alternative proposal, long and dry].
Alex Zhuravlev writes: > Nikita Danilov wrote: > > We are talking about few megabytes of data in network or in memory. It''s > > easy to replicate this state. > > I disagree - whole state can be distributed over 100K and more nodes and > some operations many need all nodes to communicate their state. this is > especially problem with lossy network. The question was about SC being the single point of failure. This can be eliminated by replicating stability messages to a few nodes. > > > Tree reduction is but an optimization. I am pretty convinced that core > > algorithm works, because this can be proved. > > sorry, works doesn''t always mean "meet requirements". in our case scalability > is the top one. in this regard I don''t see how this model can work well with But "works" always means at least "meet requirements". There is no such thing as efficient (or scalable), but incorrect program. Ordinary Lustre recovery was implemented years ago and it is still has problems. I bet it looked very easy in the beginning, so it was tempting to optimize it. > >> * once some distributed transaction is committed on all involved servers, we can prune > >> it and all its local successors > > > > Either I am misunderstanding this, or this is not correct, because not > > only a given operation, but also all operations it depends on have to be > > committed, and it is not clear how this is determined. > > the algorithm works starting from oldest operations and discards them when there is no > undo before this one. So let''s suppose we have four servers and three operations: S0 S1 S2 S3 OP0 U1 U2 OP1 U3 U4 OP2 U5 U6 Where `U?'' means that a given operation sent an update to a given server, and all updates happen to be conflicting. Suppose that transaction groups with these updates commit at the same time and servers are ready to send information to each other. What information each server sends and where? > > > One reason I wrote so lengthy a text was that I want to spell out > > everything explicitly and unambiguously (and obviously failed in the > > latter, as ensued discussion has shown). > > yes, it''s well written and proven thing. the point is different - if it''s clear that > in some cases it doesn''t work well (see sync requirement), what the proof does? It assures you that it _works_. Maybe sub-optimally, but it does. The program that is lighting fast, consumes zero memory and scales across the galaxy is useless if it is incorrect. > > thanks, Alex Nikita.
Alex Zhuravlev
2008-Dec-23 13:11 UTC
[Lustre-devel] global epochs [an alternative proposal, long and dry].
Nikita Danilov wrote:> The question was about SC being the single point of failure. This can be > eliminated by replicating stability messages to a few nodes.more complexity to workaround initial problem?> But "works" always means at least "meet requirements". There is no such > thing as efficient (or scalable), but incorrect program. Ordinary Lustre > recovery was implemented years ago and it is still has problems. I bet > it looked very easy in the beginning, so it was tempting to optimize it.then we can just proceed with synchronous IO if scalability isn''t a requirement. and BKL is much better because of simplicity.> So let''s suppose we have four servers and three operations: > > S0 S1 S2 S3 > OP0 U1 U2 > OP1 U3 U4 > OP2 U5 U6 > > Where `U?'' means that a given operation sent an update to a given > server, and all updates happen to be conflicting. > > Suppose that transaction groups with these updates commit at the same > time and servers are ready to send information to each other. What > information each server sends and where?I''ll prepare a detailed description in a separate mail.> > yes, it''s well written and proven thing. the point is different - if it''s clear that > > in some cases it doesn''t work well (see sync requirement), what the proof does? > > It assures you that it _works_. Maybe sub-optimally, but it does. The > program that is lighting fast, consumes zero memory and scales across > the galaxy is useless if it is incorrect.interesting point. sounds like it''s absolutely impossible to prove (somehow) another approach. having something "proved" doesn''t mean we shouldn''t try another approach to avoid sub-optimal but important cases? thanks, Alex
Nikita Danilov
2008-Dec-23 13:24 UTC
[Lustre-devel] global epochs [an alternative proposal, long and dry].
Alex Zhuravlev writes: > Nikita Danilov wrote: > > The question was about SC being the single point of failure. This can be > > eliminated by replicating stability messages to a few nodes. > > more complexity to workaround initial problem? More optional optimizations that are easy to implement later should they prove necessary. > > > But "works" always means at least "meet requirements". There is no such > > thing as efficient (or scalable), but incorrect program. Ordinary Lustre > > recovery was implemented years ago and it is still has problems. I bet > > it looked very easy in the beginning, so it was tempting to optimize it. > > then we can just proceed with synchronous IO if scalability isn''t a requirement. > and BKL is much better because of simplicity. Precisely. If Linus decided to do an initial Linux SMP implementation based on a fine grained locking the Linux kernel would have been as... some other Free Beautifully Scalable kernel with a Daemon (slow, un-scalable, and buggy). :-) > > Suppose that transaction groups with these updates commit at the same > > time and servers are ready to send information to each other. What > > information each server sends and where? > > I''ll prepare a detailed description in a separate mail. Thanks. > > > > yes, it''s well written and proven thing. the point is different - if it''s clear that > > > in some cases it doesn''t work well (see sync requirement), what the proof does? > > > > It assures you that it _works_. Maybe sub-optimally, but it does. The > > program that is lighting fast, consumes zero memory and scales across > > the galaxy is useless if it is incorrect. > > interesting point. sounds like it''s absolutely impossible to prove (somehow) > another approach. having something "proved" doesn''t mean we shouldn''t try > another approach to avoid sub-optimal but important cases? We definitely should try, but I think much much more formal and rigorous treatment than we are accustomed to is necessary for such fundamental thing as recovery. > > > thanks, Alex Nikita.
Andreas Dilger
2008-Dec-23 23:37 UTC
[Lustre-devel] global epochs [an alternative proposal, long and dry].
Nikita, I still need more time to re-read and digest what you have written, but thanks in advance for taking the time to explain it clearly and precisely. This algorithm does seem to be related to the one originally described in Peter''s "Cluster Metadata Recovery" paper where the epoch numbers are pushed and replied by every request, but is much better described. I think what would help me understand it a bit easier if it could be more closely mapped onto a potential implementation, and the issues we may see there. For example, the issue with fsync possibly involving all? nodes (including clients) is not obvious from your description. Similarly, some description of the practical requirements for message exchange, how easy/hard it would be to e.g. "find all undo records related to...", and the practical bound of the number of operations that might have to be kept in memory and/or rolled back/forward during recovery would be useful. In particular, the mention that clients need to participate to determine the oldest uncommitted operation seems troublesome unless the servers themselves can place a bound on this by the frequency of their commits. On Dec 22, 2008 21:57 +0300, Nikita Danilov wrote:> Any message is used as a transport for epochs, including any reply > from a server. So a typical scenario would be > > > client server > epoch = 8 epoch = 9 > > LOCK ---------------> > <-------------- REPLY > epoch = 9 > <----- other message with epoch = 10 from somewhere > epoch = 10 > .... > > REINT ---------------> > <-------------- REPLY > epoch = 10 > > <----- other message with epoch = 11 from somewhere > epoch = 11 > > REINT ---------------> > <-------------- REPLY > epoch = 11 > > etc. Note, that nothing prevents server from increasing its local epoch > before replying to every reintegration (this was mentioned in the > original document as an "extreme case"). With this policy there is never > more than one reintegration on a given client in a given epoch, and we > can indeed implement stability algorithm without clients.I was wondering if we could make some analogies between the current transno-based recovery system and your current proposal. For example, in our current recovery we increment the transno on the server before the reply for every reintegration, and due to single-RPC-in-flight to the client it could be considered in a separate "epoch" for every RPC to match your "extreme case" above. Similarly, I wonder if we could somehow map client (lack of) involvement in epochs to our current configuration, and only require "client" participation in the case of WBC or CMD? One thing that crossed my mind at this point is that the 1.8 servers already track recovery "epochs" for VBR using the transno (epoch is in high 32-bit word of transno, counter is in low 32-bit word). These "recovery epochs" are not (currently) synchronized between servers, but that would seem to be possible/needed in the future. Alternately, we might consider the VBR recovery "epochs" to be the same as the epochs you are proposing, and transno increment does not affect these epochs except to order operations within the epoch. We would increment these epochs periodically (either due to too many operations, or time limit). The current VBR epochs only make up 32 bits of the transno, but we might consider increasing the size of this epoch field to allow more epochs. If we need to do that it should preferrably be done ASAP before the 1.8.0 release is made (this would be a trivial change at this stage). Cheers, Andreas -- Andreas Dilger Sr. Staff Engineer, Lustre Group Sun Microsystems of Canada, Inc.
Alex Zhuravlev
2008-Dec-24 10:32 UTC
[Lustre-devel] global epochs [an alternative proposal, long and dry].
Hello, Nikita Danilov wrote:> So let''s suppose we have four servers and three operations: > > S0 S1 S2 S3 > OP0 U1 U2 > OP1 U3 U4 > OP2 U5 U6 > > Where `U?'' means that a given operation sent an update to a given > server, and all updates happen to be conflicting. > > Suppose that transaction groups with these updates commit at the same > time and servers are ready to send information to each other. What > information each server sends and where?instead of digging right into details, let''s agree about few simple statements the idea is based on ? (0) operation is globally committed if no operation it depends on can be aborted (1) some external mechanism order operations and updates (e.g. LDLM, local locking, etc) (2) if update U1 executed before update U2 and U2 is committed, then U1 must be committed (3) requirement: if operation O2 depends on operation O1, then O1 has conflicting update on same server with O2 example 1: mkdir /a; touch /a/b mkdir consists of two updates: U1 - create object on mds1, U2 - creates dir entry on mds2. touch consists of single update: U3 - to create object on mds1 and directory entry in a on mds1. U1 and U3 will be conflicting as they touch same object (4) operation is globally committed if all updates this operation consists of are committed and everything it depends on is committed as well explanation: say, operation O consists of two updates U1 (server S1) and U2 (server S2). let''s say U1 depends on Ua on server S1 and U2 depends on Ub on server S2. we stated that any update O can depend on are already executed due to (1). thus Ua is already executed and Ub is already executed as well. due to (2) commit of U1 means commit of Ua and commit of U2 means commit of Ub. thus direct dependency is resolved. if there is any indirect dependency, it''s resolved same way due to (4) In the example above, commit of U5 means commit of U4, same for U3 and U2. IOW, when U3 and U4 are committed, then we can consider OP1 is globally committed (won''t be aborted). any objections? thanks, Alex
Nikita Danilov
2008-Dec-24 11:37 UTC
[Lustre-devel] global epochs [an alternative proposal, long and dry].
Alex Zhuravlev writes: > Hello, > > Nikita Danilov wrote: > > So let''s suppose we have four servers and three operations: > > > > S0 S1 S2 S3 > > OP0 U1 U2 > > OP1 U3 U4 > > OP2 U5 U6 > > > > Where `U?'' means that a given operation sent an update to a given > > server, and all updates happen to be conflicting. > > > > Suppose that transaction groups with these updates commit at the same > > time and servers are ready to send information to each other. What > > information each server sends and where? > > instead of digging right into details, let''s agree about few simple statements > the idea is based on ? > > > (0) operation is globally committed if no operation it depends on can be aborted ... and all updates of the operation itself are committed on the respective servers. > > (1) some external mechanism order operations and updates (e.g. LDLM, local locking, etc) Agree. > > (2) if update U1 executed before update U2 and U2 is committed, then U1 must be committed I think this is only valid when U1 and U2 are on the same server. And even in this case this is probably required only when U1 and U2 are conflicting. > > (3) requirement: if operation O2 depends on operation O1, then O1 has conflicting > update on same server with O2 Agree, provided that `depends'' means `directly depends'', i.e., not through some intermediate operation. > > example 1: mkdir /a; touch /a/b > mkdir consists of two updates: U1 - create object on mds1, U2 - creates dir > entry on mds2. touch consists of single update: U3 - to create object on mds1 > and directory entry in a on mds1. U1 and U3 will be conflicting as they touch > same object > > (4) operation is globally committed if all updates this operation consists of are > committed and everything it depends on is committed as well I think this is wrong. Everything it depends on must be _globally_ (recursively) committed as well. Otherwise in the following scenario mkdir /a mkdir /a/b touch /a/b/f file creation depends on mkdir /a/b only, but touch is not globally committed when all updates of mkdir /a/b are committed, because mkdir /a might be still rolled back. As a note, I tried very hard to avoid confusion by using different terms: operations (a distributed state update) vs. transaction (a group of updates on a given server that reaches persistent storage atomically), and `stabilizes'' vs. `commits'' respectively. > > explanation: say, operation O consists of two updates U1 (server S1) and U2 > (server S2). let''s say U1 depends on Ua on server S1 and U2 depends on Ub on > server S2. we stated that any update O can depend on are already executed due > to (1). thus Ua is already executed and Ub is already executed as well. due to > (2) commit of U1 means commit of Ua and commit of U2 means commit of Ub. > > thus direct dependency is resolved. > > if there is any indirect dependency, it''s resolved same way due to (4) > > > In the example above, commit of U5 means commit of U4, same for U3 and U2. IOW, > when U3 and U4 are committed, then we can consider OP1 is globally committed > (won''t be aborted). Err.. what if U3 and U4 are committed on S1 and S2, but S0 hasn''t received U1 at all (e.g., U1 is an inode creation, that was executed without a lock and client failed), or U1 was executed, but not committed and S0 failed? It seems that OP0 will have to be rolled back, and hence OP1 and OP2 cannot be considered globally committed^W^Weverywhere stable? > > any objections? I was more interested in how batching is implemented and, specifically, at what moment server can actually remove at entry from an undo log (i.e., before or after it sends a batch, etc.), because it looks to me that server agreement on what operations are everywhere stable requires, in a general case, a two phase commit, or some other atomic commitment protocol. > > > thanks, Alex Nikita.
Eric Barton
2008-Dec-24 12:35 UTC
[Lustre-devel] global epochs [an alternative proposal, long and dry].
Andreas, Nikita, Alex, We will go through this in detail at the tech leads meeting in Beijing. I think I am beginning to understand Nikita''s proposal and I think it helps to adopt his use of "operation" (rename, mkdir etc) and "update" (the part of an operation executed on a single server). I believe it would be especially useful if we could finish working through the previous proposal too - then we would start to understand the similarities and differences and that in turn would allow us to make better critical judgments overall - e.g what is the volume and pattern of additional message passing required for distributed operations, what are the expected sizes of undo/redo logs, how does aggregation designed to mitigate these issues affect latency etc. A major concern I have with whatever scheme we finally adopt, is how to ensure the performance of synchronous metadata operations (as required by NFS) isn''t completely hosed. With CMD, you can only be sure an operation is stored stably when it can no longer be undone - i.e. when it and all operations it is transitively dependent on have been committed globally. Making this fast seems to be in direct opposition to scaling throughput, so understanding the tradeoff precisely seems essential. Cheers, Eric> -----Original Message----- > From: Andreas.Dilger at Sun.COM [mailto:Andreas.Dilger at Sun.COM] On Behalf Of Andreas Dilger > Sent: 23 December 2008 11:38 PM > To: Nikita Danilov > Cc: Alex Zhuravlev; lustre-tech-leads at sun.com; lustre-devel at lists.lustre.org > Subject: Re: global epochs [an alternative proposal, long and dry]. > > Nikita, > I still need more time to re-read and digest what you have written, > but thanks in advance for taking the time to explain it clearly and > precisely. This algorithm does seem to be related to the one originally > described in Peter''s "Cluster Metadata Recovery" paper where the epoch > numbers are pushed and replied by every request, but is much better > described. > > > I think what would help me understand it a bit easier if it could be more > closely mapped onto a potential implementation, and the issues we may see > there. For example, the issue with fsync possibly involving all? nodes > (including clients) is not obvious from your description. > > Similarly, some description of the practical requirements for message > exchange, how easy/hard it would be to e.g. "find all undo records > related to...", and the practical bound of the number of operations that > might have to be kept in memory and/or rolled back/forward during > recovery would be useful. > > In particular, the mention that clients need to participate to determine > the oldest uncommitted operation seems troublesome unless the servers > themselves can place a bound on this by the frequency of their commits. > > > On Dec 22, 2008 21:57 +0300, Nikita Danilov wrote: > > Any message is used as a transport for epochs, including any reply > > from a server. So a typical scenario would be > > > > > > client server > > epoch = 8 epoch = 9 > > > > LOCK ---------------> > > <-------------- REPLY > > epoch = 9 > > <----- other message with epoch = 10 from somewhere > > epoch = 10 > > .... > > > > REINT ---------------> > > <-------------- REPLY > > epoch = 10 > > > > <----- other message with epoch = 11 from somewhere > > epoch = 11 > > > > REINT ---------------> > > <-------------- REPLY > > epoch = 11 > > > > etc. Note, that nothing prevents server from increasing its local epoch > > before replying to every reintegration (this was mentioned in the > > original document as an "extreme case"). With this policy there is never > > more than one reintegration on a given client in a given epoch, and we > > can indeed implement stability algorithm without clients. > > I was wondering if we could make some analogies between the current > transno-based recovery system and your current proposal. For example, > in our current recovery we increment the transno on the server before > the reply for every reintegration, and due to single-RPC-in-flight to > the client it could be considered in a separate "epoch" for every RPC > to match your "extreme case" above. > > Similarly, I wonder if we could somehow map client (lack of) involvement > in epochs to our current configuration, and only require "client" > participation in the case of WBC or CMD? > > > One thing that crossed my mind at this point is that the 1.8 servers already > track recovery "epochs" for VBR using the transno (epoch is in high 32-bit > word of transno, counter is in low 32-bit word). These "recovery epochs" > are not (currently) synchronized between servers, but that would seem to be > possible/needed in the future. > > Alternately, we might consider the VBR recovery "epochs" to be the same > as the epochs you are proposing, and transno increment does not affect > these epochs except to order operations within the epoch. We would > increment these epochs periodically (either due to too many operations, > or time limit). > > The current VBR epochs only make up 32 bits of the transno, but we might > consider increasing the size of this epoch field to allow more epochs. > If we need to do that it should preferrably be done ASAP before the 1.8.0 > release is made (this would be a trivial change at this stage). > > > Cheers, Andreas > -- > Andreas Dilger > Sr. Staff Engineer, Lustre Group > Sun Microsystems of Canada, Inc.
Nikita Danilov
2008-Dec-24 16:16 UTC
[Lustre-devel] global epochs [an alternative proposal, long and dry].
Andreas Dilger writes: > Nikita, Hello, > I still need more time to re-read and digest what you have written, > but thanks in advance for taking the time to explain it clearly and > precisely. This algorithm does seem to be related to the one originally > described in Peter''s "Cluster Metadata Recovery" paper where the epoch > numbers are pushed and replied by every request, but is much better > described. thanks. > I think what would help me understand it a bit easier if it could be more > closely mapped onto a potential implementation, and the issues we may see > there. For example, the issue with fsync possibly involving all? nodes > (including clients) is not obvious from your description. I agree with Eric that we can discuss this in more detail in Beijing, and just want to make one rather obvious remark: your and Alex''s concerns about fsync are indeed justified, because in the global epochs model fsync is no different from sync, as no per-object dependencies are tracked. On the other hand, for the target use case of pNFS, where every operation is synchronous, this is probably less important. Nikita.
Alex Zhuravlev
2008-Dec-26 09:01 UTC
[Lustre-devel] global epochs [an alternative proposal, long and dry].
Nikita Danilov wrote:> ... and all updates of the operation itself are committed on the > respective servers.yes> > (2) if update U1 executed before update U2 and U2 is committed, then U1 must be committed > > I think this is only valid when U1 and U2 are on the same server. And > even in this case this is probably required only when U1 and U2 are > conflicting.agree about same server. i think this is model used by ext3 and DMU.> > (3) requirement: if operation O2 depends on operation O1, then O1 has conflicting > > update on same server with O2 > > Agree, provided that `depends'' means `directly depends'', i.e., not > through some intermediate operation.yes> > (4) operation is globally committed if all updates this operation consists of are > > committed and everything it depends on is committed as well > > I think this is wrong. Everything it depends on must be _globally_ > (recursively) committed as well. Otherwise in the following scenarioOK additional clarification: "all updates this operation consists of are globally committed"> As a note, I tried very hard to avoid confusion by using different > terms: operations (a distributed state update) vs. transaction (a group > of updates on a given server that reaches persistent storage > atomically), and `stabilizes'' vs. `commits'' respectively. >I like the terms.> Err.. what if U3 and U4 are committed on S1 and S2, but S0 hasn''t > received U1 at all (e.g., U1 is an inode creation, that was executed > without a lock and client failed), or U1 was executed, but not committed > and S0 failed? It seems that OP0 will have to be rolled back, and hence > OP1 and OP2 cannot be considered globally committed^W^Weverywhere > stable?with fixed definition i think it''s correct.> I was more interested in how batching is implemented and, specifically, > at what moment server can actually remove at entry from an undo log > (i.e., before or after it sends a batch, etc.), because it looks to me > that server agreement on what operations are everywhere stable requires, > in a general case, a two phase commit, or some other atomic commitment > protocol.then a bit more words. I think the following statement is still true: when any operation is being executed (updates are being executed on target servers), all updates it depends on are already executed. let''s fix server''s state at time our updates begin to execute: S1 is a state on server 1, S2 is a state on server 2,,,, Sn is a state on server N. due to (2) once all states S1..Sn are committed, all dependency our updates might have are resolved and they can''t be aborted due to abort of some previous operation. in practice this mean that having series of updates on some server: U1, U2, U3, U4,,,,, Un, Un+1 we can choose some N, ask all servers for their last generated transno (not last committed transno) and assign set of transno to point N. once all servers have reported corresponded transno committed, we know that all dependency updates U1..Un might have are resolved and U1..Un can''t be aborted. (5) of course, this is true only for operations with number of updates = 1 (iirc, we call them local operations in contrast with distributed where number of updates > 1). for distributed operations we also need to make sure all updates are committed. when some server commits update and corresponded operation has 2 or more updates, then server reports this to other servers involved in the operation. in practice, server doesn''t report immediately, instead it put transaction id into some batch (batches) which will be fired later. (6) now back to series updates on server: U1, U2, U3, U4,,,, Un, Un+1. in general, each update has own undo record in the log. record for any local update at the beginning of the series can be cancelled once corresponded update is locally committed. record for any distributed operation''s update can be removed from the series so that it doesn''t hold remaining records, but not cancelled. In order to cancel undo record for a distributed operation we need to make sure that during recovery none of undo record of this operation can be used, otherwise recovery can be confused finding record on one server, but not on another one. this can be done with llog-like protocol: for any distributed operation, server with minimal id cancel own undo record and generates another record M marking operation globally committed. then server notifies other servers involved in the operation, their cancel own undo records, once cancels are committed, record M can be cancelled. (7) now let''s consider that example: S0 S1 S2 S3 OP0 U1 U2 OP1 U3 U4 OP2 U5 U6 le''s redraw it a bit .... undo series of S0: U1 undo series of S1: U2 U3 undo series of S2: U4 U5 undo series of S3: U6 S0 reports committness of U1 in transno T01 (OP1) to S1, now S1 knows U2 depends on S0/T01 S1 reports committness of U2 in transno T11 (OP1) to S0, now S0 knows U1 depends on S1/T11 S1 reports committness of U3 in transno T12 (OP2) to S2, now S2 knows U4 depends on S1/T12 S2 reports committness of U4 in transno T21 (OP2) to S1, now S1 knows U3 depends on S2/T21 S2 reports committness of U5 in transno T22 (OP3) to S3, now S3 knows U6 depends on S2/T22 S3 reports committness of U6 in transno T31 (OP3) to S2, now S2 knows U5 depends on S3/T31 now each server knows direct dependency. then all them have to resolve global dependency: S0 requests current state from S1,S2,S3 - they return last generated transno S1 requests current state from S0,S2,S3 --//-- S2 requests current state from S0,S1,S3 --//-- S3 requests current state from S0,S1,S2 --//-- at some point all servers report collected transno committed. given all updates belong to distributed transactions, servers can remove them from series so that they don''t hold dependency for anyone, but not cancel. as noted in (7) we can use llog-like protocol to cancel undo records for distributed operations. as they don''t block any operation we can postpone cancel for very long to improve bandwidth usage. I think this *oversimplified* approach demonstrates that we can do "stabilization" with anywhere-generated-id operations. messages reporting committness can be batched. we can even use bi-directional protocol when S0 reporting committness of U1 to S1 gets a reply claiming committness of U2 back. any message can carry "last generated transno" along with "last committed", making "request current state" not needed. One of important advantages such approach has is ability to implement fsync(2) more optimal way, without involving whole cluster. The simplest optimization could be to omit requests for other server''s state (see (5)) and undo records, for all local operations if there is undo log is empty. so, as long as server doesn''t execute global operations all local operations are executed with zero "epoch overhead", like today. More advanced approach could include propagation of involved servers when they exchange committness of distributed operations (see (6)). Say, if server S1 has no other distributed operations (thus doesn''t depend on other servers), then reporting commit of update U1 (part of operation O1) to server S2 it tells dependency of itself on S1,S2. when S2 reports committness of some other operation O2 to server S3, it tells dependency on S1,S2. now, when S3 resolves global dependency (see (5)), it doesn''t requests state from all the servers, but only from S1 and S2. We can go further even and include last generated transno along with server into report. Then other servers don''t need to request states even, just wait till servers have those transno committed. even more advanced approach could be to track precise dependency for any operation. this is not very useful for ldiskfs as fsync(2) flushes all pending updates, but with DMU we could use zlog and flush only really required bits. thanks, Alex
Hello, here is another thought ... whole distributed recovery can be divided into two parts: * "purge" job, before recovery takes place, we write and purge undo records * "undo" job, when recovery takes place and we rollback to some consistent state global epochs do very well "purge job", because of constant overhead. but when it comes to fsync, global epochs do not because to fsync some epoch X, we need to wait till all nodes having unstable epochs Y < X report it to SC and then we have to synchronously write new global stable epoch. it''s especially not very well from reliability point of view - having many nodes to contribute makes this vulnerable for failures. with dependencies we could implement more efficient fsync because all you need is to sync _servers_ holding uncommitted updates for some object and updates it depends on, we don''t need to wait for other nodes, then write some record. IOW, dependencies do well "undo job", because it''s not a global undo of all unlucky operations, but only really inconsistent ones. but dependencies do bad with "purge job", because traffic overhead (in bytes) is order of distributed updates. the question is .... could we use global epochs for "purge" and dependencies for "undo" ? say, updates are tagged with an epoch and unique tag. for regular activity global minimum is found in lazy manner. but during recovery we build chains of dependencies using unique tags and do very selective rollback. thanks, Alex Nikita Danilov wrote:> Hello, > > a few proposals for a distributed recovery for the upcoming CMD release > of Lustre were discussed recently. In my opinion, many of them > (including the clients-epoch approach that I advocated) are very > complex, and a simpler solution, that can be fully understood is > needed. The following is an attempt to provide such a solution. > > Nikita. > > * * * > > This is a strawman proposal. At least it would help us to settle the > terminology. > > The following describes an alternative distributed recovery mechanism. As this > proposal is somewhat radically alternative, exposition is rather abstract, > please bear with it. > > The summary is that the original `global epochs'' [10] proposal is modified to > involve all cluster nodes, including clients. This seeks to fix what is seen > as a major drawback of the said algorithm: its reliance on a master-slave > processing. > > Definitions: > ===========> > A _cluster_ consists of _nodes_. Every node has a volatile storage. Some nodes > have persistent storage. Persistent means `surviving any failure considered in > the model''. > > Nodes exchange _messages_. A message X with a parameter Y, sent from a node N > to a node M is denoted as > > X(Y) : N -> M > > Synchronous message send is denoted as > > X(Y) : N ->> M > > It is, in reality, a sequence > > X(Y) : N -> M > RepX : M -> N > > of a send and a reply. > > Nodes _join_ the cluster, and _part_ from the cluster. A node can be forcibly > parted from the cluster---_evicted_. > > An _operation_ is a `primitive'' distributed modification of state, that moves > distributed state from one consistent state to another consistent > state. `Primitive'' because without such a qualification a valid sequence of > operations would be an operation itself. > > An operation OP consists of _updates_ of a state of every node involved in > this operation: OP = (U(0), ... U(n)), where U(k) is an update for a node > U(k).node. > > A _reintegration_ of an operation is a process by which a node (by sending > messages) requests other nodes to _execute_ updates of a given operation, > i.e., to effect corresponding state change in the node storage (volatile or > persistent). Details of reintegration are described below. > > A node with a persistent storage supports _transactions_, which are means to > declare that a sequence of updates, executed in a volatile storage, must reach > persistent storage atomically. > > Two updates are _conflicting_ if their results (including success or failure > indication) and the final state are depending on the order of their > execution. > > For a given update U, a node N can send a message to U.node, requesting a > _lock_ that will delay requests for locks for conflicting updates requested > from other nodes until the lock is either released by another message or when > N leaves the cluster. (In reality locks are taken on objects, but introducing > them would complicate the exposition.) > > Epoch Basics: > ============> > The core epochs algorithm is very simple. > > Every node N keeps in its volatile storage an _epoch number_, denoted > N.epoch. Every message X is tagged with an epoch number that is denoted as > X.epoch. These epoch numbers are maintained according to the following > protocol: > > E1. On receiving X : M -> N, N sets > > N.epoch = max(N.epoch, X.epoch); > > E2. On sending X : N -> M, N sets > > X.epoch = N.epoch; > > Assignments in E1 and E2 must be mutually atomic. Compare this with `Lamport > timestamps'' [1] and `vector clocks'' [2]. > > Progressing toward new epochs will be described later, for now assume that > there are multiple epoch numbers at the same time stored in the node memories > and traversing the network in messages. > > Operations: > ==========> > O1. To reintegrate an operation OP = (U(0), ... U(n)), a node N > > - sends lock requests: LOCK(U(k)) : N ->> U(k).node; > > - sends reintegration messages: REINT(U(k)) : N -> U(k).node > atomically w.r.t. E1. > > - adds U to the volatile `redo log''. > > O1 doesn''t require all LOCK messages to be synchronous and serialized: it''s > only necessary that replies to all LOCK messages are received before first > REINT message is sent. > > We denote REINT(U).epoch as U.epoch (well-defined), and say that update U `is > in the epoch U.epoch'', and that corresponding undo record (see O2) is a record > `in epoch U.epoch''. > > O2. On receiving REINT(U) : M -> N (where N == U.node), > node N transactionally > > - executes U in the volatile storage, and > > - adds to the `undo log'' a record [U, OP] > > Note that U.epoch can be less than N.epoch at the time of > execution (it cannot be greater than the latter due to E1). > > We consider only single-level reintegration, where execution of an update > requires no further reintegrations. Generalization to the multi-level case is > left as an exercise for a curious reader. > > Correctness: > ===========> > We can now prove a number of very simple statements: > > S0: For a node N, N.epoch increases monotonically in time. > > Proof: The only place where N.epoch is modified is E1, and this is > obviously a non-decreasing function. > > S1: A collection of all updates in a given epoch is presicely a collection of > updates for some set of operations (i.e., epoch contains no partial > operations). > > Proof: Obvious from O1: all updates for a given operation are sent in the > same epoch. > > S2: For any sequence of conflicting updates (U{0}, ... U{n}), the sequence > (U{0}.epoch, ..., U{n}.epoch) is monotonically increasing. > > Proof: Consider conflicting updates U{k} and U{k+1}. From O1 and the > definition of locking it is immediately clear that the following sequence > of message sends took place: > > LOCK(U{k}) : N -> S ; request a lock for U{k} > RepLOCK(U{k}) : S -> N ; get the lock for U{k} > > (*) REINT(U{k}) : N -> S ; reintegrate U{k} > > LOCK(U{k+1}) : M -> S ; conflicting lock is requested by M > > (*) UNLOCK(U{k}) : N -> S ; N yields the lock > > (*) RepLOCK(U{k+1}) : S -> M ; M get the lock > > (*) REINT(U{k+1}) : M -> S ; reintegrate U{k+1} > > Only ordering of messages marked with (*) matters, the rest is just for > completeness. Then > > U{k}.epoch == REINT(U{k}).epoch ; by definition > <= UNLOCK(U{k}).epoch ; by S0 for N and E2 > <= RepLOCK(U{k+1}) ; by S0 for S and E2 > <= REINT(U{k+1}) ; by S0 for M > == U{k+1}.epoch ; by definition > > In the essence, S2 states that epoch ordering is compatible with the causal > ordering of updates. An important consequence of this is that an epoch cannot > `depend'' on a previous epoch. Note that the proof of S2 is very similar to the > proof of serializability [7] of the database schedules under the two-phase > locking (2PL) protocol [3]. > >>From S0, S1 and S2 it seems very plausible to conclude that > > S3: For any epoch E, a collection of updates in all epochs up to and including > E is presicely a collection of updates in some prefix of execution > history. That is, for every node N, said collection contains updates from all > operations reintegrated by N before some moment T in N''s physical time, and no > updates from operations reintegrated by N after T. Alternatively, `an epoch > boundary is a consistent state snapshot''. > > We won''t prove S3, as this requires formalizing the notions of global and > local histories, distributed schedules, etc., which is more formalism than is > tolerable at the moment. > > Intermezzo: > ==========> > S3 is the main weapon in achieving correct distributed recovery: it claims > that restoring the distributed state as of on an epoch boundary results in a > globally consistent state. The key observation is that due to O2 every node > with a persistent storage has enough information to individually restore its > state to the boundary of _any_ epoch, all updates from which it has on its > persistent storage, even in the face of failures. Once all such nodes agreed > on a common epoch number, they restore their state independently. It is this > agreeing on a single number instead of agreeing on a common set of updates > that greatly simplifies recovery. > > Advancing epochs: > ================> > So far no way to progress to the next epoch was introduced. If algorithms > described above were ran as is, there would be only one epoch boundary: an > initial file system state (as created by mkfs), and it would be the only point > to which epoch-based recovery could restore the system up to. > > A switch to the next epoch can be initiated by any node N, and is effected by > > E3. N.epoch++; > > That''s all. That is, multiple nodes can advance epochs completely > independently without any communication whatsoever. To understand why this is > sound recall the proof of S3: all it relies on is that epochs monotonically > increase across a chain _dependent_ messages, and to be involved into > dependent operation nodes communicate (through another node perhaps), and > their epoch numbers are synchronized by E1 and E2. > > E3 is executed atomically w.r.t. E1 and E2. Note that E3 doesn''t break epoch > monotonicity assumed by S0. > > To speed up announcement of a new epoch, N > > E4. (optionally) sends null messages to some nodes. > > The more, if any, null messages are sent to other nodes, the faster news about > new epoch are spread across the cluster. In the extreme case, N broadcasts > announcement to the whole cluster. Note that there is no synchrony > requirements for the null messages: it is perfectly valid, for example, that N > is still sending them when another node already started sending the next round > of announcements. >