Saving a read-only file edited in vi / vim

We’ve all done it…opened a file using vi or vim to inspect the contents, and realize we need to alter it.  Of course we totally ignored the message informing we didn’t have permission to edit, so we’re only allowed to view it as “read-only”.  Then after we find the troublesome spot we hit “i” and happily edit the place needing changed, only to “face-palm” when we realize we cannot save the wonderful edit we just made.

In the past I handled this one of three ways: I either copied and pasted the change after reopening the file using sudo, or I reopened and retyped everything once again, or I save the file as a temp file and then rename it using sudo.  Very stupid, stressful and time consuming.

However, now I know a better way.  Using a combination of ‘tee’ and ‘sudo’ commands I can now save the read-only file rather than jumping through the hoops in the previous paragraph.  Here is how:

Open a file as normal, forgetting to use “sudo”, and therefore viewing a read-only file.
001-b_open_read_only

Then mistakenly try to edit the read-only file in the traditional manner.
002-b_insert-in-readonly-file

But when we try to save using ‘:w!’, SHIFT+ZZ, or :qw!, or whatever combination we normally fail with as an alternative.  Here is sample output of what we see:
003-b_try-to-save

At this point is where the new magic can happen. Instead of the normal “face-palm” we do not “ENTER” and move on. We can enter a new command and successfully save the file after entering the sudo password:
005-b_sudo-password

At this point we will be presented with the content of the file and a prompt to press ENTER or type another command. To simply save the file and move on we just press ENTER, and then press the letter “O” (oh). (NOTE: “L” seems to do pretty much the same thing.)  The file will be saved but remains open in vi/vim for more editing or reading.  We can now exit normally by typing “:q!” since the file is still open as “read-only”.
006-b_final-save

What the command does:

  • :w = Write a file.
  • !sudo = Call shell sudo command.
  • tee = The output of the vi/vim write command is redirected using tee.
  • % = Triggers the use of the current filename.
  • Simply put, the ‘tee’ command is run as sudo and follows the vi/vim command on the current filename given.

[转载]http://www.geekyboy.com/archives/629

走近伏羲,谈5000节点集群调度与性能优化

阿里巴巴分布式调度系统被命名为“伏羲”,主要负责管理集群的机器资源和调度并发的计算任务,为上层分布式应用提供稳定、高效、安全的资源管理和任务调度服务。5K项目是一场全方位战役,给伏羲系统带来规模、性能、稳定、运维等多方面的技术挑战。我们从架构设计、实现细节和模块依赖等多维度入手,做了大量优化工作来规避性能陷阱,为用户构筑可用可靠的云计算引擎,进一步降低成本,挖掘数据价值。

     5K项目是飞天平台的里程碑,系统在规模、性能和容错方面都得到了飞跃式的发展,达到世界领先水平。伏羲作为飞天平台的分布式调度系统,能支持单集群5000节点,并发运行10000作业,30分钟完成100TB数据Terasort,性能是当时Yahoo ! 在Sort Benchmark上世界纪录的两倍。

伏羲介绍

“飞天”是阿里巴巴的云计算平台,其中的分布式调度系统被命名为“伏羲”(代码名称Fuxi),名字来自我国古代神话人物。伏羲主要负责管理集群的机器资源和调度并发的计算任务,目前支持离线数据处理(DAG Job)和在线服务(Service),为上层分布式应用如ODPS / OSS / OTS提供稳定、高效、安全的资源管理和任务调度服务,为阿里巴巴集团打造数据分享第一平台的目标提供了强大的计算引擎。

伏羲系统设计上采用M / S架构(如图1所示),系统有一个被称为“伏羲Master”的集群控制中心,其余每台机器上会运行一个叫做“伏羲Agent”的守护进程,守护进程除了管理节点上运行的任务外,还负责收集该节点上的资源使用情况,并将之汇报给控制中心。控制中心与伏羲Agent之间使用心跳机制,以监测节点健康状态。当用户向伏羲Master提交一个任务时,伏羲Master会调度出一个可用节点在其上启动任务的主控进程AppMaster,主控进程随后会向伏羲Master提出资源请求,得到伏羲Master分配的资源后,AppMaster通知相应节点上的伏羲Agent开始运行任务Worker。伏羲是一个支持多任务并发的调度系统,控制中心伏羲Master负责在多个任务之间仲裁,支持优先级、资源Quota配额和抢占。

使用伏羲,用户可以运行常见的MapReduce任务,还可以托管在线服务,满足不同应用场景的需求。多用户可以共享集群,伏羲支持配置分组的资源配额,限定每个用户组可以使用的计算资源。紧急任务如重要数据报表可以提高任务优先级来优先使用计算资源。

5K带来的挑战

在5K项目攻坚过程中,我们看到大型云计算平台从设计到实现每一步都可能存在性能“陷阱”,原因主要在三个方面:规模放大效应,当系统扩展到数千节点时,原本非瓶颈与规模成正比的环节,其影响会被放大;木桶效应,很多时候,系统中99 % 的地方都被优化过,完成剩下1 % 的优化看起来也只是“锦上添花”,然而那1 % 很可能就会成为影响系统性能的致命的瓶颈;长路径模块依赖,有些请求处理过程可能需要跨越多个模块(包括外部模块),而外部模块性能的不稳定性最终可能会影响到这个请求的处理性能和稳定性。

5K项目是一场全方位战役,给伏羲系统带来规模、性能、稳定、运维等多方面的技术挑战,例如下面的性能“陷阱”。

■通信消息DDoS:在5000规模的集群中,不同进程之间的RPC请求数量会随规模猛增,网络中总请求数可达10000 QPS,极易造成系统中单点进程的消息拥塞,从而导致请求处理严重超时。另外消息处理还存在队头阻塞(HoL)问题。

■关键函数OPS:伏羲Master是资源调度的中心节点,内部关键调度函数的OPS必须达到极高的标准,否则就可能因为木桶效应影响到集群整体的调度性能。

■故障恢复对外部模块依赖:伏羲Master具有对用户透明的故障恢复功能(Failover),其恢复过程依赖写在Nuwa上的Checkpoint(注:Nuwa是飞天平台的协同系统,如名字服务)。因此,整体恢复速度会受到Nuwa访问速度的影响。

我们做了大量伏羲优化工作来规避上述的性能“陷阱”,涉及到架构设计、实现细节和模块依赖,透过现象看本质,从最底层性能分析入手一步步找到瓶颈。下面结合具体的实战例子来分享优化过程。

伏羲优化实战

通信性能优化

在5K项目初期阶段,我们测试大规模并发作业时发现,当作业数量超过1000时就容易出现运行时间变长的现象。分析监控曲线和日志,我们发现AppMaster发给伏羲Master的资源请求出现大量消息超时,AppMaster迟迟拿不到资源,资源请求处理的延时很高。

消息从到达伏羲Master进程到最终被处理返回的总时间主要包括在队列中等待时间和实际处理的时间,因此延时高无非是两个原因:消息处理本身的OPS下降;消息堆积在待处理队列中未被及时处理。顺着这一思路,在通过Profiling发现伏羲Master资源调度关键函数并没有占到整个消息处理延时的大部分后,罪魁祸首就只剩下消息堆积了。在绘出了伏羲Master中资源调度消息队列中消息堆积的曲线之后,果然发现当作业数量增加时,堆积的请求数量剧增(如图2所示),每一条请求的处理时间也较小规模时高出很多。

为什么在伏羲Master队列中会堆积如此多的消息?在伏羲系统中,守护进程伏羲Agent和AppMaster都需要向负责资源调度的伏羲Master查询资源状态,在通信策略上采用了定期Polling的方式,缺省是每秒查询一次。采用Polling通信方式主要基于其简单性,能比较鲁棒地应对网络故障,消息传递发送过程比较自然有规律。然而在5000规模集群中,这个策略必须进行调整优化,否则会造成伏羲Master被大量请求“DDoS攻击”而无法服务。

定位到消息堆积的问题后,我们立即对消息通信策略进行了流控,算法简单有效:发送端检查如果上次询问的请求结果已经返回,表明目前伏羲Master请求处理较为顺畅,则间隔一个较短的时间后进行下一次询问。反之,如果上次询问的请求超时,说明伏羲Master较忙(例如有任务释放大批资源待处理等),发送端则等待较长时间后再发送请求。通过这种自适应流控的通信策略调整,伏羲Master消息堆积问题得到了有效解决。

此外,我们还解决了伏羲Master消息的队头阻塞(HoL)问题。AppMaster需要与伏羲Master通信获得资源调度结果,同时也与伏羲Agent通信进行Worker的启停。由于伏羲Agent数量远大于伏羲Master,在极端情况下,如果AppMaster采用同一个线程池来处理这些消息,那么伏羲Master消息会被前面大量的伏羲Agent消息阻塞。我们将消息处理的全路径包括从发送到处理完毕等各个时间段进行了Profling,结果印证了队头阻塞现象。当一个任务的Worker较多时,AppMaster需要与之通信的伏羲Agent也会增多,观察到AppMaster拿到资源的时间明显变长。针对队头阻塞问题,我们通信组件中加入了独立线程功能达到QoS的效果,并应用在AppMaster处理伏羲Master消息的通信中。如图3所示,伏羲Master的消息单独使用一个线程池,其余消息则共用另一个线程池。

通过上面的两项性能优化,伏羲系统内部的通信压力得到显著降低,提高了通信效率。AppMaster与伏羲Master之间的资源请求通信得到改善,任务提交后能很快分配到资源开始运行,提高了多并发任务场景下任务的完成速度。例如,经过这个优化,用户通过ODPS客户端对海量数据进行Ad hoc的SQL查询处理速度能得到显著提升。

关键函数优化

在5K项目中我们还重点关注系统中的关键函数性能,那里也可能藏着“陷阱”。伏羲Master在调度资源时的一个关键操作是:比较一个节点的空闲资源能否满足该节点上排队等待的所有资源请求,从而决定该资源分配给哪个任务。这个函数的调用次数会与机器规模和请求数量成正比,因此其速度对伏羲Master的调度OPS有决定性影响。

伏羲在调度资源时支持多个维度,如内存、CPU、网络、磁盘等,所有的资源和请求都用一个多维的键值对表示,例如 {Mem: 10, CPU: 50,net: 40,disk: 60}。因此,判断一个空闲资源能否满足一个资源请求的问题可以简单地抽象成多维向量的比较问题,例如R: [r1, r2, r3, r4] > Q: [q1, q2, q3, q4],其中1、2、3、4等数字表示各个维度,当且仅当R各个维度均大于Q时才判断R > Q。比较次数决定了这个操作的时间复杂度。最好情况下只需比较1次即可得出结果,如判断 [1, 10, 10, 10]大于 [2, 1, 1, 1]失败;最差需要D次(D为维度数),如判断 [10, 10, 10, 1]大于 [1, 1, 1, 10]需比较4次。在资源调度高频发生时,必须对这里的比较进行优化。

我们通过Profiling分析了系统运行时资源空闲与请求情况,在资源充足时通常值最大的维度最难满足,因此在资源调度场景我们采用基于主键的优化算法:对每个资源请求的最大值所在维度定义为该向量的主键,当有空闲资源时首先比较主键维度是否满足请求,如果在主键上满足再比较其他维度。此外,对一个节点上排队等待所有请求的主键值再求一个最小值,空闲资源如果小于该最小值则无需再比较其他请求。通过主键算法,我们大大减少了资源调度时向量比较次数,伏羲Master一次调度时间优化到几个毫秒。注意到资源请求提交后不会改变,因此计算主键的系统开销可以忽略不计。

伏羲Master关键调度性能的优化增强了系统的规模扩展能力,用户利用飞天平台能管理更大规模的集群,容纳更多的计算任务,发挥出云计算平台的成本优势。

模块依赖性能优化

伏羲Master支持故障恢复,在重启后进行故障恢复时需要从Nuwa读取所有任务的描述文件(Checkpoint)以继续运行用户任务。考虑到之前Nuwa服务在服务器端对文件内容没有做持久化,伏羲Master在读取了Checkpoint后还会再写一次Nuwa,这个回写操作性能依赖于Nuwa模块。在5000节点的集群上,名字解析压力的显著增加导致Nuwa在Server的回写操作上也出现了性能下降问题,最终通过模块依赖传递到了伏羲Master,从而影响了故障恢复的性能。经测试观察,一次Checkpoint回写就消耗70秒,这大大降低了伏羲系统的可用性。

我们对伏羲Master故障恢复进行了优化。首先,从伏羲Master的角度,在故障恢复时刚刚读取的Checkpoint内容在Nuwa服务器端是不会发生改变的,因此读取Checkpoint后没有必要回写到服务器端,只需要通知本地的Nuwa Agent让其代理即可,Agent会负责服务器宕机重启时向服务器推送本地缓存的文件内容。于是与Nuwa团队的同学合作,在Nuwa API中新增加一个只写本地的接口,这样伏羲Master规避了在故障恢复时回写Checkpoint的性能风险。优化后,在5000节点集群和并发5000任务的测试规模下,一次故障恢复中处理Checkpoint操作仅需18秒(主要时间在一次读取)。可见在分布式系统中,对外部模块的依赖哪怕只是一个RPC请求也可能是“性能陷阱”,在设计和实现时尽量避免出现在关键路径上。

故障恢复是分布式系统保证可用性必须具备的功能,经过优化,伏羲Master的快速故障恢复增强了飞天计算平台的可用性和稳定性,屏蔽了硬件故障,使用户的使用过程不受影响。

工程经验

高质量代码没有捷径可走,也不能只靠制度流程,唯有认真二字:作者认真、Reviewer认真、测试认真。

■任何一个Item,无论是解决Bug还是新增Feature,都必须在动手写代码前讨论清楚方案,Code Review不能代替方案讨论。在讨论时作者需要回答两个问题:这个解决方法真的可行吗?副作用是什么?这些讨论需要记录在Wiki或者BugFree等工具上进行跟踪。

■小步快跑,尽早提交Code Review,很多问题在这个阶段就能发现,不必等到测试中发现,代价大。

■代码Reviewer对Item有一半的责任,因此Review时不是简单过一遍字面完事的。我采用的Checklist有:是否准确反映了之前讨论好的方案;是否存在死锁、“性能陷阱”;模块化封装是否足够;函数名变量名是否规范,日志格式是否规范;注释是否足够。一段代码Review迭代10次左右是很常见的。

■一定要有针对性的测试验证。

■代码提交时关联相应的Bug和Review ID,便于后续追溯。

总结

以上和大家分享了5K项目的一些实践经验,伏羲系统在5K项目中还做了很多有意义的系统优化和技术探索,参与其中收获颇丰。性能是功能的一部分,是系统生死线而非锦上花。5K项目只是阿里云计算平台技术发展的一个开始,未来会在更大规模和更丰富计算模型等方面进一步发展,为用户构筑可用可靠的云计算引擎,进一步降低成本,挖掘数据价值。

 

[转载]https://lingyun.aliyun.com/4/tech-fuxi.html

The evolution of cluster scheduler architectures

Cluster schedulers are an important component of modern infrastructure, and have evolved significantly in the last few years. Their architecture has moved from monolithic designs to much more flexible, disaggregated and distributed designs. However, many current open-source offerings are either still monolithic, or otherwise lack key features. These features matter to real-world users, as they are required to achieve good utilization.

This post is our first in a series of posts about task scheduling on large clusters, such as those operated by internet companies like Amazon, Google, Facebook, Microsoft, or Yahoo!, but increasingly elsewhere too. Scheduling is an important topic because it directly affects the cost of operating a cluster: a poor scheduler results in low utilization, which costs money as expensive machines are left idle. High utilization, however, is not sufficient on its own: antagonistic workloads interfere with other workloads unless the decisions are made carefully.

Architectural evolution

This post discusses how scheduler architectures have evolved over the last few years, and why this happened. Figure 1 visualises the different approaches: a gray square corresponds to a machine, a coloured circle to a task, and a rounded rectangle with an “S” inside corresponds to a scheduler.0 Arrows indicate placement decisions made by schedulers, and the three colours correspond to different workloads (e.g., web serving, batch analytics, and machine learning).

(a) Monolithic scheduler. (b) Two-level scheduling. (c) Shared-state scheduling. (d) Distributed scheduling. (e) Hybrid scheduling.

Figure 1: Different cluster scheduler architectures. Gray boxes represent cluster machines, circles correspond to tasks and Si denotes scheduler i.

Many cluster schedulers – such as most high-performance computing (HPC) schedulers, the Borg scheduler, various early Hadoop schedulers and the Kubernetes scheduler – are monolithic. A single scheduler process runs on one machine (e.g., the JobTracker in Hadoop v1, and kube-scheduler in Kubernetes) and assigns tasks to machines. All workloads are handled by the same scheduler, and all tasks run through the same scheduling logic (Figure 1a). This is simple and uniform, and has led to increasingly sophisticated schedulers being developed. As an example, see the Paragon and Quasar schedulers, which use a machine learning approach to avoid negative interference between workloads competing for resources.

Most clusters run different types of applications today (as opposed to, say, just Hadoop MapReduce jobs in the early days). However, maintaining a single scheduler implementation that handles mixed (heterogeneous) workloads can be tricky, for several reasons:

  1. It is quite reasonable to expect a scheduler to treat long-running service jobs and batch analytics jobs differently.
  2. Since different applications have different needs, supporting them all keeps adding features to the scheduler, increasing the complexity of its logic and implementation.
  3. The order in which the scheduler processes tasks becomes an issue: queueing effects (e.g., head-of-line blocking) and backlog can become an issue unless the scheduler is carefully designed.

Overall, this sounds like the makings of an engineering nightmare – and the never-ending lists of feature requests that scheduler maintainers receive attests to this.1

Two-level scheduling architectures address this problem by separating the concerns of resource allocation and task placement. This allows the task placement logic to be tailored towards specific applications, but also maintains the ability to share the cluster between them. The Mesos cluster manager pioneered this approach, and YARN supports a limited version of it. In Mesos, resources are offered to application-level schedulers (which may pick and choose from them), while YARN allows the application-level schedulers to to requestresources (and receive allocations in return).2 Figure 1b shows the general idea: workload-specific schedulers (S0–S2) interact with a resource manager that carves out dynamic partitions of the cluster resources for each workload. This is a very flexible approach that allows for custom, workload-specific scheduling policies.

Yet, the separation of concerns in two-level architectures comes with a drawback: the application-level schedulers lose omniscience, i.e., they cannot see all the possible placement options any more.3 Instead, they merely see those options that correspond to resources offered (Mesos) or allocated (YARN) by the resource manager component. This has several disadvantages:

  1. Priority preemption (higher priority tasks kick out lower priority ones) becomes difficult to implement: in an offer-based model, the resources occupied by running tasks aren’t visible to the upper-level schedulers; in a request-based model, the lower-level resource manager must understand the preemption policy (which may be application-dependent).
  2. Schedulers are unable to consider interference from running workloads that may degrade resource quality (e.g., “noisy neighbours” that saturate I/O bandwidth), since they cannot see them.
  3. Application-specific schedulers care about many different aspects of the underlying resources, but their only means of choosing resources is the offer/request interface with the resource manager. This interface can easily become quite complex.

Shared-state architectures address this by moving to a semi-distributed model,4 in which multiple replicas of cluster state are independently updated by application-level schedulers, as shown in Figure 1c. After the change is applied locally, the scheduler issues an optimistically concurrent transaction to update the shared cluster state. This transaction may fail, of course: another scheduler may have made a conflicting change in the meantime.

The most prominent examples of shared-state designs are Omega at Google, and Apollo at Microsoft, as well as the Nomad container scheduler by Hashicorp. All of these materialise the shared cluster state in a single location: the “cell state” in Omega, the “resource monitor” in Apollo, and the “plan queue” in Nomad.5 Apollo differs from the other two as its shared-state is read-only, and the scheduling transactions are submitted directly to the cluster machines. The machines themselves check for conflicts and accept or reject the changes. This allows Apollo to make progress even if the shared-state is temporarily unavailable.6

A “logical” shared-state design can also be achieved without materialising the full cluster state anywhere. In this approach (somewhat similar to what Apollo does), each machine maintains its own state and sends updates to different interested agents such as schedulers, machine health monitors, and resource monitoring systems. Each machine’s local view of its state now forms a “shard” of the global shared-state.

However, shared-state architectures have some drawbacks, too: they must work with stale information (unlike a centralized scheduler), and may experience degraded scheduler performance under high contention (although this can apply to other architectures as well).

Fully-distributed architectures take the disaggregation even further: they have no coordination between schedulers at all, and use many independent schedulers to service the incoming workload, as shown in Figure 1d. Each of these schedulers works purely with its local, partial, and often out-of-date view of the cluster. Jobs can typically be submitted to any scheduler, and each scheduler may place tasks anywhere in the cluster. Unlike with two-level schedulers, there are no partitions that each scheduler is responsible for. Instead, the overall schedule and resource partitioning are emergent consequences of statistical multiplexing and randomness in workload and scheduler decisions – similar to shared-state schedulers, albeit without any central control at all.

The recent distributed scheduler movement probably started with the Sparrow paper, although the underlying concept (power of multiple random choices) first appeared in 1996. The key premise of Sparrow is a hypothesis that the tasks we run on clusters are becoming ever shorter in duration, supported by an argument that fine-grained tasks have many benefits. Consequently, the authors assume that tasks are becoming more numerous, meaning that a higher decision throughput must be supported by the scheduler. Since a single scheduler may not be able to keep up with this throughput (assumed to be a million tasks per second!), Sparrow spreads the load across many schedulers.

This makes perfect sense: and the lack of central control can be conceptually appealing, and it suits some workloads very well – more on this in a future post. For the moment, it suffices to note that since the distributed schedulers are uncoordinated, they apply significantly simpler logic than advanced monolithic, two-level, or shared-state schedulers. For example:

  1. Distributed schedulers are typically based on a simple “slot” concept that chops each machine into n uniform slots, and places up to nparallel tasks. This simplifies over the fact that tasks’ resource requirements are not uniform.
  2. They also use worker-side queues with simple service disciplines (e.g., FIFO in Sparrow), which restricts scheduling flexibility, as the scheduler can merely choose at which machine to enqueue a task.
  3. Distributed schedulers have difficulty enforcing global invariants (e.g., fairness policies or strict priority precedence), since there is no central control.
  4. Since they are designed for rapid decisions based on minimal knowledge, distributed schedulers cannot support or afford complex or application-specific scheduling policies. Avoiding interference between tasks, for example, becomes tricky.

Hybrid architectures are a recent (mostly academic) invention that seeks to address these drawbacks of fully distributed architectures by combining them with monolithic or shared-state designs. The way this typically works – e.g., in Tarcil, Mercury, and Hawk – is that there really are two scheduling paths: a distributed one for part of the workload (e.g., very short tasks, or low-priority batch workloads), and a centralized one for the rest. Figure 1e illustrates this design. The behaviour of each constituent part of a hybrid scheduler is identical to the part’s architecture described above. In practice, no hybrid schedulers have been deployed in production settings yet, however, as far as I know.

What does this mean in practice?

Discussion about the relative merits of different scheduler architectures is not merely an academic topic, although it naturally revolves around research papers. For an extensive discussion of the Borg, Mesos and Omega papers from an industry perspective, for example, seeAndrew Wang’s excellent blog post. Moreover, many of the systems discussed are deployed in production settings at large enterprises (e.g., Apollo at Microsoft, Borg at Google, and Mesos at Apple), and they have in turn inspired other systems that are available as open source projects.

These days, many clusters run containerised workloads, and consequently a variety of contained-focused “orchestration frameworks” have appeared. These are similar to what Google and others call “cluster managers”. However, there are few detailed discussions of the schedulers within these frameworks and their design principles, and they typically focus more on the user-facing scheduler APIs (e.g., this report by Armand Grillet, which compares Docker Swarm, Mesos/Marathon, and the Kubernetes default scheduler). Moreover, many users neither know what difference the scheduler architecture makes, nor which one is most suitable for their applications.

Figure 2 shows an overview of a selection of open-source orchestration frameworks, their architecture and the features supported by their schedulers. At the bottom of the table, We also include closed-source systems at Google and Microsoft for reference. The resource granularity column indicates whether the scheduler assigns tasks to fixed-size slots, or whether it allocates resources in multiple dimensions (e.g., CPU, memory, disk I/O bandwidth, network bandwidth, etc.).

Framework Architecture Resource granularity Multi-scheduler Pluggable logic Priority preemption Re-scheduling Over-subscription Resource estimation Avoid interference
O
P
E
N
Kubernetes monolithic multi-dimensional N[v1.2,DD,Issue] Y[DD] N[Issue] N[Issue] Y[DD] N N
Swarm monolithic multi-dimensional N N N[Issue] N N N N
YARN monolithic/
two-level
RAM/CPU slots Y N[app-lvl. only] N[JIRA] N N[JIRA] N N
Mesos two-level multi-dimensional Y Y[framework-lvl.] N[JIRA] N Y[v0.23,Doc] N N
Nomad shared-state multi-dimensional Y Y N[Issue] N[Issue] N[Issue] N N
Sparrow fully-distributed fixed slots Y N N N N N N
C
L
O
S
E
D
Borg monolithic[7] multi-dimensional N[7] N[7] Y Y Y Y N
Omega shared-state multi-dimensional Y Y Y Y Y Y N
Apollo shared-state multi-dimensional Y Y Y Y N N N

Figure 2: Architectural classification and feature matrix of widely-used orchestration frameworks, compared to closed-source systems.

One key aspect that helps determine an appropriate scheduler architecture is whether your cluster runs a heterogeneous (i.e., mixed) workload. This is the case, for example, when combining production front-end services (e.g., load-balanced web servers and memcached) with batch data analytics (e.g., MapReduce or Spark). Such combinations make sense in order to improve utilization, but the different applications have different scheduling needs. In a mixed setting, a monolithic scheduler likely results in sub-optimal assignments, since the logic cannot be diversified on a per-application basis. A two-level or shared-state scheduler will likely offer benefits here. 8

Most user-facing service workloads run with resource allocations sized to serve peak demand expected of each container, but in practice they typically under-utilize their allocations substantially. In this situation, being able to opportunistically over-subscribe the resources with lower-priority workloads (while maintaining QoS guarantees) is the key to an efficient cluster. Mesos is currently the only open-source system that ships support for such over-subscription, although Kubernetes has a fairly mature proposal for adding it. We should expect more activity in this space in the future, since the utilization of most clusters is still substantially lower than the 60-70% reported for Google’s Borg clusters. We will focus on resource estimation, over-subscription and efficient machine utilization in a future post in this series.

Finally, specific analytics and OLAP-style applications (for example, Dremel or SparkSQL queries) can benefit from fully-distributed schedulers. However, fully-distributed schedulers (like e.g., Sparrow) come with fairly restricted feature sets, and thus work best when the workload is homogeneous (i.e., all tasks run for roughly the same time), set-up times are low (i.e., tasks are scheduled to long-running workers, as e.g., with MapReduce application-level tasks in YARN), and task churn is very high (i.e., many scheduling decisions must be made in a short time). We will talk more about these conditions and why fully-distributed schedulers – and the distributed components of hybrid schedulers – only make sense for these applications in the next blog post in this series. For now, it suffices to observe that distributed schedulers are substantially simpler than others, and do not support multiple resource dimensions, over-subscription, or re-scheduling.

Overall, the table in Figure 2 is evidence that the open-source frameworks still have some way to go until they match the feature sets of advanced, but closed-source systems. This should serve as a call to action: as a result of missing features, utilization suffers, task performance is unpredictable, noisy neighbours cause pagers to go off, and elaborate hacks are required to coerce schedulers into supporting some user needs.

However, there are some good news: while many frameworks have monolithic schedulers today, many are also moving towards more flexible designs. Kubernetes already supports pluggable schedulers (the kube-scheduler pod can be replaced by another API-compatible scheduler pod), multiple schedulers from v1.2, and has ongoing work on “extenders” to supply custom policies. Docker Swarm may – to our understanding – also gain pluggable scheduler support in the future.

What’s next?

The next blog post in this series will look at the question of whether fully distributed architectures are the key innovation required to scalecluster schedulers further (spoiler: not necessarily). After that, we will also look at resource-fitting strategies (essential for good utilisation), and finally discuss how our Firmament scheduling platform combines many of the benefits of a shared-state architecture with the scheduling quality of monolithic schedulers and the speed of fully-distributed schedulers.

 

Correction: March 10, 2016
An earlier version of the text incorrectly reported the implementation status of some Kubernetes features. We amended the table in Figure 2and the text to clarify that scheduler extenders are implemented, and that over-subscription is supported although automatic resource estimation is not. We also added a footnote explaining that a single scheduler can serve a mixed workload, but that its complexity will be high.

Correction: March 15, 2016
An earlier version of the text suggested that YARN and Mesos are two-level designs in an equal sense. However, YARN’s application-level scheduling is substantially less powerful than Mesos’s. This is now clearer in the text, and clarified further in footnote 2.

Follow us on Twitter to find out about new posts.

 


0 – This figure simplifies things a bit: of course, in practice each machine runs more than one task, and many schedulers fit tasks in multiple resource dimensions, rather than into simple slots.

1 – As an illustrative example, kube-scheduler in Kubernetes currently has outstanding feature requests for re-scheduling (pod migration), priority preemption, and resource over-subscription in its monolithic scheduler.

2 – YARN’s approach is restricted compared to Mesos because the application-level logic cannot choose resources (unless it requests much more than it needs from the resource manager), but it can only place application-level “tasks” to pre-existing containers that represent cluster-level tasks.
This is a good fit for a system like Hadoop MapReduce, in which application-level tasks (maps and reduces) must be assigned to a dynamic collection of workers in an application-specific way (e.g., optimised for data locality and per-job). It is less suited to building a more general, multi-application scheduler on top – for example, a service scheduler like the “Marathon” framework for Mesos.
Monolithic schedulers like the Kubernetes one do not support this and rely on the application doing its own scheduling (e.g., running a Spark “worker controller” as a long-running service). Consequently, there are efforts to put Kubernetes on top of YARN via a specialYARNScheduler extension – requiring two complex systems to be administered. However, there are also long-term efforts to improve native “big data” batch processing support in Kubernetes.

3 – In the Omega paper, this problem is referred to as “information hiding”.

4 – Curiously, the literature does not appear to be quite sure in agreement about whether to consider shared-state schedulers centralized or distributed: the Hawk paper treats them as examples of distributed schedulers, while the Mercury paper refers to them as examples of a centralized architecture!

5 – Nomad actually uses a slightly different approach to Omega and Apollo: while multiple independent schedulers exist, jobs are not submitted directly to them, but instead arrive via a centralised “evaluation broker” queue.

6 – It’s worth noting that the same optimisation – taking the shared-state off the critical path to enacting scheduling decisions – can be applied to Omega, but not to Nomad (in its current design): Omega can ship deltas directly to machines and update the cell state out-of-band, while Nomad’s design is premised on the leader reconciling changes in the plan queue.

7 – The table entry reflects the original Borg, but the Borg paper and the recent ACM Queue paper note that multi-scheduler support and other features have been back-ported into from Omega into Borg.

8 – That said, having multiple schedulers is not a necessary precondition for serving mixed workloads: the Borg scheduler is a case in point that a sophisticated single scheduler can serve both long-running service and batch workloads. However, this comes at the expense of higher scheduler implementation complexity – a key motivation for Omega’s multi-scheduler design.

 

[转载]http://www.cl.cam.ac.uk/research/srg/netos/camsas/blog/2016-03-09-scheduler-architectures.html

mesos, omega, borg: a survey

Google recently unveiled one of their crown jewels of system infrastructure: Borg, their cluster scheduler. This prompted me to re-read the Mesos and Omega papers, which deal with the same topic. I thought it’d be interested to do a compare and contrast of these systems. Mesos gets credit for the groundbreaking idea of two-level scheduling, Omega improved upon this with an analogy from databases, and Borg can sort of be seen as the culmination of all these ideas.

Background

Cluster schedulers have existed long before big data. There’s a rich literature on scheduling on 1000s of cores in the HPC world, but their problem domain is simpler than what is addressed bydatacenter schedulers, meaning Mesos/Borg and their ilk. Let’s compare and contrast on a few dimensions.

Scheduling for locality

Supercomputers separate storage and compute and connect them with an approximately full-bisection bandwidth network that goes at close to memory speeds (GB/s). This means your tasks can get placed anywhere on the cluster without worrying much about locality, since all compute nodes can access data equally quickly. There are a few hyper-optimized applications that optimize for the network topology, but these are very rare.

Data center schedulers do care about locality, and in fact this is the whole point of GFS and MapReduce co-design. Back in the 2000s, network bandwidth was comparatively much more expensive than disk bandwidth. So, there was a huge economic savings by scheduling your computation tasks on the same node that held the data. This is a major scheduling constraint; whereas before you could put the task anywhere, now it needs to go on one of the three data replicas.

Hardware configuration

Supercomputers are typically composed of homogeneous nodes, i.e. they all have the same hardware specs. This is because supercomputers are typically purchased in one shot: a lab gets $x million dollars for a new one, and they spend it all upfront. Some HPC applications are optimized for the specific CPU models in a supercomputer. New technology like GPUs or co-processors are rolled out as a new cluster.

In the big data realm, clusters are primarily storage constrained, so operators are continually adding new racks with updated specs to expand cluster capacity. This means it’s typical for nodes to have different CPUs, memory capacities, number of disks, etc. Also toss in special additions like SSDs, GPUs, shingled drives. A single datacenter might need to support a broad range of applications, and all of this again imposes additional scheduling constraints.

Queue management and scheduling

When running an application on a supercomputer, you specify how many nodes you want, the queue you want to submit your job to, and how long the job will run for. Queues place different restrictions on how many resources you can request and how long your job can run for. Queues also have a priority or reservation based system to determine ordering. Since the job durations are all known, this is a pretty easy box packing problem. If the queues are long (typically true) and there’s a good mix of small jobs to backfill the space leftover from big jobs (also typical), you can achieve extremely high levels of utilization. I like to visualize this in 2D, with time as X and resource usage as Y.

As per the previous, datacenter scheduling is a more general problem. The “shape” of resource requests can be quite varied, and there are more dimensions. Jobs also do not have a set duration, so it’s hard to pre-plan queues. Thus we have more sophisticated scheduling algorithms, and the performance of the scheduler thus becomes important.

Utilization as a general rule is going to be worse (unless you’re Google; more on that later), but one benefit over HPC workloads is that MapReduce and similar can be incrementally scheduled instead of gang scheduled. HPC, we wait until all N nodes that you requested are available, then run all your tasks at once. MR can instead run its tasks in multiple waves, meaning it can still effectively use bits of leftover resources. A single MR job can also ebb and flow based on cluster demand, which avoids the need for preemption or resource reservations, and also helps with fairness between multiple users.

Mesos

Mesos predates YARN, and was designed with the problems of the original MapReduce in mind. Back then, Hadoop clusters could run only a single application: MapReduce. This made it difficult to run applications that didn’t conform to a map phase followed by a reduce phase. The biggest example here is Spark. Previously, you’d have to install a whole new set of workers and masters for Spark, which would sit alongside your MapReduce workers and masters. Hardly ideal from a utilization perspective, since they were typically statically partitioned.

Mesos addresses this problem by providing a generalized scheduler for all cluster applications. MapReduce and Spark became simply different applications using the same underlying resource sharing framework. The simplest approach would be to write a centralized scheduler, but that has a number of drawbacks:

  • API complexity. We need a single API that is a superset of all known framework scheduler APIs. This is difficult by itself. Expressing resource requests will also become very complicated.
  • Performance. 10’s of thousands of nodes and millions of tasks is a lot, especially if the scheduling problem is complex.
  • Code agility. New schedulers and new frameworks are constantly being written, with new requirements.

Instead, Mesos introduces the idea of two-level scheduling. Mesos delegates the per-application scheduling work to the applications themselves, while Mesos still remains responsible for resource distribution between applications and enforcing overall fairness. This means Mesos can be pretty thin, 10K lines of code.

Two-level scheduling happens through a novel API called resource offers, where Mesos periodically offers some resources to the application schedulers. This sounds backwards at first (the request goes from the master to the application?), but it’s actually not that strange. In MR1, the TaskTracker workers are the source of truth as to what’s running on a node. When a TT heartbeats in saying that a task has completed, the JobTracker then chooses something else to run on that TaskTracker. Scheduling decisions are triggered by what’s essentially a resource offer from the worker. In Mesos, the resource offer comes from the Mesos master instead of the slave, since Mesos is managing the cluster. Not that different.

Resource offers act as time-bounded leases for some resources. Mesos offers resources to an application based on policies like priority or fair share. The app then computes how it uses them, and tells Mesos what resources from the offer it wants. This gives the app lots of flexibility, since it can choose to run a portion of tasks now, wait for a bigger allocation later (gang scheduling), or size its tasks differently to fit what’s available. Since offers are time-bounded, it also incentivizes applications to schedule quickly.

Some concerns and how they were addressed:

  • Long tasks hogging resources. Mesos lets you reserve some resources for short tasks, killing them after a time limit. This also incentivizes using short tasks, which is good for fairness.
  • Performance isolation. Use Linux Containers (cgroups).
  • Starvation of large tasks. It’s difficult to get sole access to a node, since some other app with smaller tasks will snap it up. The fix is having a minimum offer size.

Unaddressed / unknown resolution:

  • Gang scheduling. I think this is impossible to do with high utilization without either knowing task lengths or preempting. Incrementally hoarding resources works with low utilization, but can result in deadlock.
  • Cross-application preemption is also hard. The resource offer API has no way of saying “here are some low-priority tasks I could kill if you want them”. Mesos depends on tasks being short to achieve fairness.

Omega

Omega is sort of a successor to Mesos, and in fact shares an author. Since the paper uses simulated results for its evaluation, I suspect it never went into production at Google, and the ideas were rolled into the next generation of Borg. Rewriting the API is probably too invasive of a change, even for Google.

Omega takes the resource offers one degree further. In Mesos, resource offers are pessimistic or exclusive. If a resource has been offered to an app, the same resource won’t be offered to another app until the offer times out. In Omega, resource offers are optimistic. Every application is offered all the available resources on the cluster, and conflicts are resolved at commit time. Omega’s resource manager is essentially just a relational database of all the per-node state with different types of optimistic concurrency control to resolve conflicts. The upside of this is vastly increased scheduler performance (full parallelism) and better utilization.

The downside of all this is that applications are in a free-for-all where they are allowed to gobble up resources as fast as they want, and even preempt other users. This is okay for Google because they use a priority-based system, and can go yell at their internal users. Their workload broadly falls into just two priority bands: high-priority service jobs (HBase, webservers, long-lived services) and low-priority batch jobs (MapReduce and similar). Applications are allowed to preempt lower-priority jobs, and are also trusted to stay within their cooperatively enforced limits on # of submitted jobs, amount of allocated resources, etc. I think Yahoo has said differently about being able to go yell at users (certainly not scalable), but it works somehow at Google.

Most of the paper talks about how this optimistic allocation scheme works with conflicts, which is always the question. There are a few high-level notes:

  • Service jobs are larger, and have more rigorous placement requirements for fault-tolerance (spread across racks).
  • Omega can probably scale up to 10s but not 100s of schedulers, due to the overhead of distributing the full cluster state.
  • Scheduling times of a few seconds is typical. They also compare up to 10s and 100s of seconds, which is where the benefits of two-level scheduling really kick in. Not sure how common this is, maybe for service jobs?
  • Typical cluster utilization is about 60%.
  • Conflicts are rare enough that OCC works in practice. They were able to go up to 6x their normal batch workload before the scheduler fell apart.
  • Incremental scheduling is very important. Gang-scheduling is significantly more expensive to implement due to increased conflicts. Apparently most applications can do incremental okay, and can just do a couple partial allocations to get up to their total desired amount.
  • Even for complicated schedulers (10s per-job overheads), Omega can still schedule a mixed workload with reasonable wait times.
  • Experimenting with a new MapReduce scheduler was empirically easy with Omega

Open questions

  • At some point, optimistic concurrency control breaks down because of a high conflict rate and the duplicated work from retries. It seems like they won’t run into this in practice, but I wonder if there are worst-case scenarios with oddly-shaped tasks. Is this affected by the mix of service and batch jobs? Is this something that is tuned in practice?
  • Is a lack of global policies really acceptable? Fairness, preemption, etc.
  • What’s the scheduling time like for different types of jobs? Have people written very complicated schedulers?

Borg

This is a production experience paper. It’s the same workload as Omega since it’s also Google, so many of the metapoints are the same.

High-level

  • Everything runs within Borg, including the storage systems like CFS and BigTable.
  • Median cluster size is 10K nodes, though some are much bigger.
  • Nodes can be very heterogeneous.
  • Linux process isolation is used (essentially containers), since Borg predates modern virtual machine infrastructure. Efficiency and launch time were important.
  • All jobs are statically linked binaries.
  • Very complicated, very rich resource specification language available
  • Can rolling update running jobs, meaning configuration and binary. This sometimes requires a task restart, so fault-tolerance is important.
  • Support for “graceful stop” via SIGTERM before final kill via SIGKILL. The soft kill is optional, and can not be relied on for correctness.

Allocs

  • Resource allocation is separated from process liveness. An alloc can be used for task grouping or to hold resources across task restarts.
  • An alloc set is a group of allocs on multiple machines. Multiple jobs can be run within a single alloc.
  • This is actually a pretty common pattern! Multi-process is useful to separate concerns and development.

Priorities and quotas

  • Two priority bands: high and low for service and batch.
  • Higher priority jobs can preempt lower priority
  • High priority jobs cannot preempt each other (prevents cascading livelock situations)
  • Quotas are used for admission control. Users pay more for quota at higher priorities.
  • Also provide a “free” tier that runs at lowest priority, to encourage high utilization and backfill work.
  • This is a simple and easy to understand system!

Scheduling

  • Two phases to scheduling: finding feasible nodes, then scoring these nodes for final placement.
  • Feasibility is heavily determined by task constraints.
  • Scoring is mostly determined by system properties, like best-fit vs. worst-fit, job mix, failure domains, locality, etc.
  • Once final nodes are chosen, Borg will preempt to fit if necessary.
  • Typical scheduling time is around 25s, because of localizing dependencies. Downloading the binaries is 80% of this. This locality matters. Torrent and tree protocols are used to distribute binaries.

Scalability

  • Centralization has not been an impossible performance bottleneck.
  • 10s of thousands of nodes, 10K tasks per minute scheduling rate.
  • Typical Borgmaster uses 10-14 cores and 50GB of RAM.
  • Architecture has become more and more multi-process over time, with reference to Omega and two-level scheduling.
  • Single master Borgmaster, but some responsibilities are still sharded: state updates from workers, read-only RPCs.
  • Some obvious optimizations: cache machine scores, compute feasibility once per task type, don’t attempt global optimality when making scheduling decisions.
  • Primary argument against bigger cells is isolation from operator errors and failure propagation. Architecture keeps scaling fine

Utilization

  • Their primary metric was cell compaction, or the smallest cluster that can still fit a set of tasks. Essentially box packing.
  • Big gains from the following: not segregating workloads or users, having big shared clusters, fine-grained resource requests.
  • Optimistic overcommit on a per-Borglet basis. Borglets do resource estimation, and backfill non-prod work. If the estimation is incorrect, kill off the non-prod work. Memory is the inelastic resource.
  • Sharing does not drastically affect CPI (CPU interference), but I wonder about the effect on storage.

Lessons learned

The issues listed here are pretty much fixed in Kubernetes, their public, open-source container scheduler.

Bad:

  • Would be nice to schedule multi-job workflows rather than single joba, for tracking and management. This also requires more flexible ways of referring to components of a workflow. This is solved by attaching arbitrary key-value pairs to each task and allowing users to query against them.
  • One IP per machine. This leads to port conflicts on a single machine and complicates binding and service discovery. This is solved by Linux namespaces, IPv6, SDN.
  • Complicated specification language. Lots of knobs to turn, which makes it hard to get started as a casual user. Some work on automatically determining resource requirements.

Good:

  • Allocs are great! Allows helper services to be easily placed next to the main task.
  • Baking in services like load balancing and naming is very useful.
  • Metrics, debugging, web UIs are very important so users can solve their own problems.
  • Centralization scales up well, but need to split it up into multiple processes. Kubernetes does this from the start, meaning a nice clean API between the different scheduler components.

Closing remarks

It seems like YARN will need to draw from Mesos and Omega to scale up to the 10K node scale. YARN is still a centralized scheduler, which is the strawman for comparison in Mesos and Omega. Borg specifically mentions the need to shard to scale.

Isolation is very important to achieve high utilization without compromising SLOs. This can surface at the application layer, where apps themselves need to be design to be latency-tolerant. Think tail-at-scale request replication in BigTable. Ultimately it comes down to hardware spend vs. software spend. Running at lower utilization sidesteps this problem. Or, you can tackle it head-on through OS isolation mechanisms, resource estimation, and tuning your workload and schedulers. At Google-scale, there’s enough hardware that it makes sense to hire a bunch of kernel developers. Fortunately they’ve done the work for us 🙂

I wonder also if the Google workload assumptions apply more generally. Priority bands, reservations, and preemption work well for Google, but our customers almost all use the fair share scheduler. Yahoo uses the capacity scheduler. Twitter uses the fair scheduler. I haven’t heard of any demand or usage of a priority + reservation scheduler.

Finally, very few of our customers run big shared clusters as envisioned at Google. We have customers with thousands of nodes, but this is split up into pods of hundreds of nodes. It’s also still common to have separate clusters for separate users or applications. Clusters are also typically homogeneous in terms of hardware. I think this will begin to change though, and soon.

[转载]http://umbrant.com/blog/2015/mesos_omega_borg_survey.html

Apache Hadoop YARN

YARN Architecture

The fundamental idea of YARN is to split up the functionalities of resource management and job scheduling/monitoring into separate daemons. The idea is to have a global ResourceManager (RM) and per-application ApplicationMaster (AM). An application is either a single job or a DAG of jobs.

The ResourceManager and the NodeManager form the data-computation framework. The ResourceManager is the ultimate authority that arbitrates resources among all the applications in the system. The NodeManager is the per-machine framework agent who is responsible for containers, monitoring their resource usage (cpu, memory, disk, network) and reporting the same to the ResourceManager/Scheduler.

The per-application ApplicationMaster is, in effect, a framework specific library and is tasked with negotiating resources from the ResourceManager and working with the NodeManager(s) to execute and monitor the tasks.

MapReduce NextGen Architecture

The ResourceManager has two main components: Scheduler and ApplicationsManager.

The Scheduler is responsible for allocating resources to the various running applications subject to familiar constraints of capacities, queues etc. The Scheduler is pure scheduler in the sense that it performs no monitoring or tracking of status for the application. Also, it offers no guarantees about restarting failed tasks either due to application failure or hardware failures. The Scheduler performs its scheduling function based the resource requirements of the applications; it does so based on the abstract notion of a resource Container which incorporates elements such as memory, cpu, disk, network etc.

The Scheduler has a pluggable policy which is responsible for partitioning the cluster resources among the various queues, applications etc. The current schedulers such as the CapacityScheduler and the FairScheduler would be some examples of plug-ins.

The ApplicationsManager is responsible for accepting job-submissions, negotiating the first container for executing the application specific ApplicationMaster and provides the service for restarting the ApplicationMaster container on failure. The per-application ApplicationMaster has the responsibility of negotiating appropriate resource containers from the Scheduler, tracking their status and monitoring for progress.

MapReduce in hadoop-2.x maintains API compatibility with previous stable release (hadoop-1.x). This means that all MapReduce jobs should still run unchanged on top of YARN with just a recompile.

Capacity Scheduler

Purpose

This document describes the CapacityScheduler, a pluggable scheduler for Hadoop which allows for multiple-tenants to securely share a large cluster such that their applications are allocated resources in a timely manner under constraints of allocated capacities.

Overview

The CapacityScheduler is designed to run Hadoop applications as a shared, multi-tenant cluster in an operator-friendly manner while maximizing the throughput and the utilization of the cluster.

Traditionally each organization has it own private set of compute resources that have sufficient capacity to meet the organization’s SLA under peak or near peak conditions. This generally leads to poor average utilization and overhead of managing multiple independent clusters, one per each organization. Sharing clusters between organizations is a cost-effective manner of running large Hadoop installations since this allows them to reap benefits of economies of scale without creating private clusters. However, organizations are concerned about sharing a cluster because they are worried about others using the resources that are critical for their SLAs.

The CapacityScheduler is designed to allow sharing a large cluster while giving each organization capacity guarantees. The central idea is that the available resources in the Hadoop cluster are shared among multiple organizations who collectively fund the cluster based on their computing needs. There is an added benefit that an organization can access any excess capacity not being used by others. This provides elasticity for the organizations in a cost-effective manner.

Sharing clusters across organizations necessitates strong support for multi-tenancy since each organization must be guaranteed capacity and safe-guards to ensure the shared cluster is impervious to single rouge application or user or sets thereof. The CapacityScheduler provides a stringent set of limits to ensure that a single application or user or queue cannot consume disproportionate amount of resources in the cluster. Also, the CapacityScheduler provides limits on initialized/pending applications from a single user and queue to ensure fairness and stability of the cluster.

The primary abstraction provided by the CapacityScheduler is the concept of queues. These queues are typically setup by administrators to reflect the economics of the shared cluster.

To provide further control and predictability on sharing of resources, the CapacityScheduler supports hierarchical queues to ensure resources are shared among the sub-queues of an organization before other queues are allowed to use free resources, there-by providing affinity for sharing free resources among applications of a given organization.

Features

The CapacityScheduler supports the following features:

  • Hierarchical Queues – Hierarchy of queues is supported to ensure resources are shared among the sub-queues of an organization before other queues are allowed to use free resources, there-by providing more control and predictability.
  • Capacity Guarantees – Queues are allocated a fraction of the capacity of the grid in the sense that a certain capacity of resources will be at their disposal. All applications submitted to a queue will have access to the capacity allocated to the queue. Adminstrators can configure soft limits and optional hard limits on the capacity allocated to each queue.
  • Security – Each queue has strict ACLs which controls which users can submit applications to individual queues. Also, there are safe-guards to ensure that users cannot view and/or modify applications from other users. Also, per-queue and system administrator roles are supported.
  • Elasticity – Free resources can be allocated to any queue beyond it’s capacity. When there is demand for these resources from queues running below capacity at a future point in time, as tasks scheduled on these resources complete, they will be assigned to applications on queues running below the capacity (pre-emption is not supported). This ensures that resources are available in a predictable and elastic manner to queues, thus preventing artifical silos of resources in the cluster which helps utilization.
  • Multi-tenancy – Comprehensive set of limits are provided to prevent a single application, user and queue from monopolizing resources of the queue or the cluster as a whole to ensure that the cluster isn’t overwhelmed.
  • Operability
    • Runtime Configuration – The queue definitions and properties such as capacity, ACLs can be changed, at runtime, by administrators in a secure manner to minimize disruption to users. Also, a console is provided for users and administrators to view current allocation of resources to various queues in the system. Administrators can add additional queues at runtime, but queues cannot be deleted at runtime.
    • Drain applications – Administrators can stop queues at runtime to ensure that while existing applications run to completion, no new applications can be submitted. If a queue is in STOPPED state, new applications cannot be submitted to itself or any of its child queueus. Existing applications continue to completion, thus the queue can be drained gracefully. Administrators can also start the stopped queues.
  • Resource-based Scheduling – Support for resource-intensive applications, where-in a application can optionally specify higher resource-requirements than the default, there-by accomodating applications with differing resource requirements. Currently, memory is the the resource requirement supported.

 

  • Queue Mapping based on User or Group – This feature allows users to map a job to a specific queue based on the user or group.

 

 

 

Fair Scheduler

Purpose

This document describes the FairScheduler, a pluggable scheduler for Hadoop that allows YARN applications to share resources in large clusters fairly.

Introduction

Fair scheduling is a method of assigning resources to applications such that all apps get, on average, an equal share of resources over time. Hadoop NextGen is capable of scheduling multiple resource types. By default, the Fair Scheduler bases scheduling fairness decisions only on memory. It can be configured to schedule with both memory and CPU, using the notion of Dominant Resource Fairness developed by Ghodsi et al. When there is a single app running, that app uses the entire cluster. When other apps are submitted, resources that free up are assigned to the new apps, so that each app eventually on gets roughly the same amount of resources. Unlike the default Hadoop scheduler, which forms a queue of apps, this lets short apps finish in reasonable time while not starving long-lived apps. It is also a reasonable way to share a cluster between a number of users. Finally, fair sharing can also work with app priorities – the priorities are used as weights to determine the fraction of total resources that each app should get.

The scheduler organizes apps further into “queues”, and shares resources fairly between these queues. By default, all users share a single queue, named “default”. If an app specifically lists a queue in a container resource request, the request is submitted to that queue. It is also possible to assign queues based on the user name included with the request through configuration. Within each queue, a scheduling policy is used to share resources between the running apps. The default is memory-based fair sharing, but FIFO and multi-resource with Dominant Resource Fairness can also be configured. Queues can be arranged in a hierarchy to divide resources and configured with weights to share the cluster in specific proportions.

In addition to providing fair sharing, the Fair Scheduler allows assigning guaranteed minimum shares to queues, which is useful for ensuring that certain users, groups or production applications always get sufficient resources. When a queue contains apps, it gets at least its minimum share, but when the queue does not need its full guaranteed share, the excess is split between other running apps. This lets the scheduler guarantee capacity for queues while utilizing resources efficiently when these queues don’t contain applications.

The Fair Scheduler lets all apps run by default, but it is also possible to limit the number of running apps per user and per queue through the config file. This can be useful when a user must submit hundreds of apps at once, or in general to improve performance if running too many apps at once would cause too much intermediate data to be created or too much context-switching. Limiting the apps does not cause any subsequently submitted apps to fail, only to wait in the scheduler’s queue until some of the user’s earlier apps finish.

Hierarchical queues with pluggable policies

The fair scheduler supports hierarchical queues. All queues descend from a queue named “root”. Available resources are distributed among the children of the root queue in the typical fair scheduling fashion. Then, the children distribute the resources assigned to them to their children in the same fashion. Applications may only be scheduled on leaf queues. Queues can be specified as children of other queues by placing them as sub-elements of their parents in the fair scheduler allocation file.

A queue’s name starts with the names of its parents, with periods as separators. So a queue named “queue1” under the root queue, would be referred to as “root.queue1”, and a queue named “queue2” under a queue named “parent1” would be referred to as “root.parent1.queue2”. When referring to queues, the root part of the name is optional, so queue1 could be referred to as just “queue1”, and a queue2 could be referred to as just “parent1.queue2”.

Additionally, the fair scheduler allows setting a different custom policy for each queue to allow sharing the queue’s resources in any which way the user wants. A custom policy can be built by extending org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy. FifoPolicy, FairSharePolicy (default), and DominantResourceFairnessPolicy are built-in and can be readily used.

Certain add-ons are not yet supported which existed in the original (MR1) Fair Scheduler. Among them, is the use of a custom policies governing priority “boosting” over certain apps.

Automatically placing applications in queues

The Fair Scheduler allows administrators to configure policies that automatically place submitted applications into appropriate queues. Placement can depend on the user and groups of the submitter and the requested queue passed by the application. A policy consists of a set of rules that are applied sequentially to classify an incoming application. Each rule either places the app into a queue, rejects it, or continues on to the next rule. Refer to the allocation file format below for how to configure these policies.

 

ResourceManger Restart

Overview

ResourceManager is the central authority that manages resources and schedules applications running atop of YARN. Hence, it is potentially a single point of failure in a Apache YARN cluster. ` This document gives an overview of ResourceManager Restart, a feature that enhances ResourceManager to keep functioning across restarts and also makes ResourceManager down-time invisible to end-users.

ResourceManager Restart feature is divided into two phases:

  • ResourceManager Restart Phase 1 (Non-work-preserving RM restart): Enhance RM to persist application/attempt state and other credentials information in a pluggable state-store. RM will reload this information from state-store upon restart and re-kick the previously running applications. Users are not required to re-submit the applications.
  • ResourceManager Restart Phase 2 (Work-preserving RM restart): Focus on re-constructing the running state of ResourceManager by combining the container statuses from NodeManagers and container requests from ApplicationMasters upon restart. The key difference from phase 1 is that previously running applications will not be killed after RM restarts, and so applications won’t lose its work because of RM outage.

Feature

  • Phase 1: Non-work-preserving RM restart

    As of Hadoop 2.4.0 release, only ResourceManager Restart Phase 1 is implemented which is described below.

    The overall concept is that RM will persist the application metadata (i.e. ApplicationSubmissionContext) in a pluggable state-store when client submits an application and also saves the final status of the application such as the completion state (failed, killed, finished) and diagnostics when the application completes. Besides, RM also saves the credentials like security keys, tokens to work in a secure environment. Any time RM shuts down, as long as the required information (i.e.application metadata and the alongside credentials if running in a secure environment) is available in the state-store, when RM restarts, it can pick up the application metadata from the state-store and re-submit the application. RM won’t re-submit the applications if they were already completed (i.e. failed, killed, finished) before RM went down.

    NodeManagers and clients during the down-time of RM will keep polling RM until RM comes up. When RM becomes alive, it will send a re-sync command to all the NodeManagers and ApplicationMasters it was talking to via heartbeats. As of Hadoop 2.4.0 release, the behaviors for NodeManagers and ApplicationMasters to handle this command are: NMs will kill all its managed containers and re-register with RM. From the RM’s perspective, these re-registered NodeManagers are similar to the newly joining NMs. AMs(e.g. MapReduce AM) are expected to shutdown when they receive the re-sync command. After RM restarts and loads all the application metadata, credentials from state-store and populates them into memory, it will create a new attempt (i.e. ApplicationMaster) for each application that was not yet completed and re-kick that application as usual. As described before, the previously running applications’ work is lost in this manner since they are essentially killed by RM via the re-sync command on restart.

  • Phase 2: Work-preserving RM restart

    As of Hadoop 2.6.0, we further enhanced RM restart feature to address the problem to not kill any applications running on YARN cluster if RM restarts.

    Beyond all the groundwork that has been done in Phase 1 to ensure the persistency of application state and reload that state on recovery, Phase 2 primarily focuses on re-constructing the entire running state of YARN cluster, the majority of which is the state of the central scheduler inside RM which keeps track of all containers’ life-cycle, applications’ headroom and resource requests, queues’ resource usage etc. In this way, RM doesn’t need to kill the AM and re-run the application from scratch as it is done in Phase 1. Applications can simply re-sync back with RM and resume from where it were left off.

    RM recovers its runing state by taking advantage of the container statuses sent from all NMs. NM will not kill the containers when it re-syncs with the restarted RM. It continues managing the containers and send the container statuses across to RM when it re-registers. RM reconstructs the container instances and the associated applications’ scheduling status by absorbing these containers’ information. In the meantime, AM needs to re-send the outstanding resource requests to RM because RM may lose the unfulfilled requests when it shuts down. Application writers using AMRMClient library to communicate with RM do not need to worry about the part of AM re-sending resource requests to RM on re-sync, as it is automatically taken care by the library itself.

 

ResourceManager High Availability

Introduction

This guide provides an overview of High Availability of YARN’s ResourceManager, and details how to configure and use this feature. The ResourceManager (RM) is responsible for tracking the resources in a cluster, and scheduling applications (e.g., MapReduce jobs). Prior to Hadoop 2.4, the ResourceManager is the single point of failure in a YARN cluster. The High Availability feature adds redundancy in the form of an Active/Standby ResourceManager pair to remove this otherwise single point of failure.

Architecture

Overview of ResourceManager High Availability

RM Failover

ResourceManager HA is realized through an Active/Standby architecture – at any point of time, one of the RMs is Active, and one or more RMs are in Standby mode waiting to take over should anything happen to the Active. The trigger to transition-to-active comes from either the admin (through CLI) or through the integrated failover-controller when automatic-failover is enabled.

Manual transitions and failover

When automatic failover is not enabled, admins have to manually transition one of the RMs to Active. To failover from one RM to the other, they are expected to first transition the Active-RM to Standby and transition a Standby-RM to Active. All this can be done using the “yarn rmadmin” CLI.

Automatic failover

The RMs have an option to embed the Zookeeper-based ActiveStandbyElector to decide which RM should be the Active. When the Active goes down or becomes unresponsive, another RM is automatically elected to be the Active which then takes over. Note that, there is no need to run a separate ZKFC daemon as is the case for HDFS because ActiveStandbyElector embedded in RMs acts as a failure detector and a leader elector instead of a separate ZKFC deamon.

Client, ApplicationMaster and NodeManager on RM failover

When there are multiple RMs, the configuration (yarn-site.xml) used by clients and nodes is expected to list all the RMs. Clients, ApplicationMasters (AMs) and NodeManagers (NMs) try connecting to the RMs in a round-robin fashion until they hit the Active RM. If the Active goes down, they resume the round-robin polling until they hit the “new” Active. This default retry logic is implemented as org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider. You can override the logic by implementingorg.apache.hadoop.yarn.client.RMFailoverProxyProvider and setting the value of yarn.client.failover-proxy-provider to the class name.

Recovering prevous active-RM’s state

With the ResourceManger Restart enabled, the RM being promoted to an active state loads the RM internal state and continues to operate from where the previous active left off as much as possible depending on the RM restart feature. A new attempt is spawned for each managed application previously submitted to the RM. Applications can checkpoint periodically to avoid losing any work. The state-store must be visible from the both of Active/Standby RMs. Currently, there are two RMStateStore implementations for persistence – FileSystemRMStateStore and ZKRMStateStore. The ZKRMStateStore implicitly allows write access to a single RM at any point in time, and hence is the recommended store to use in an HA cluster. When using the ZKRMStateStore, there is no need for a separate fencing mechanism to address a potential split-brain situation where multiple RMs can potentially assume the Active role. When using the ZKRMStateStore, it is advisable to NOT set the “zookeeper.DigestAuthenticationProvider.superDigest” property on the Zookeeper cluster to ensure that the zookeeper admin does not have access to YARN application/user credential information.

 

Docker Container Executor

Overview

Docker combines an easy-to-use interface to Linux containers with easy-to-construct image files for those containers. In short, Docker launches very light weight virtual machines.

The Docker Container Executor (DCE) allows the YARN NodeManager to launch YARN containers into Docker containers. Users can specify the Docker images they want for their YARN containers. These containers provide a custom software environment in which the user’s code runs, isolated from the software environment of the NodeManager. These containers can include special libraries needed by the application, and they can have different versions of Perl, Python, and even Java than what is installed on the NodeManager. Indeed, these containers can run a different flavor of Linux than what is running on the NodeManager – although the YARN container must define all the environments and libraries needed to run the job, nothing will be shared with the NodeManager.

Docker for YARN provides both consistency (all YARN containers will have the same software environment) and isolation (no interference with whatever is installed on the physical machine).

Cluster Configuration

Docker Container Executor runs in non-secure mode of HDFS and YARN. It will not run in secure mode, and will exit if it detects secure mode.

The DockerContainerExecutor requires Docker daemon to be running on the NodeManagers, and the Docker client installed and able to start Docker containers. To prevent timeouts while starting jobs, the Docker images to be used by a job should already be downloaded in the NodeManagers. Here’s an example of how this can be done:

sudo docker pull sequenceiq/hadoop-docker:2.4.1

This should be done as part of the NodeManager startup.

The following properties must be set in yarn-site.xml:

<property>
 <name>yarn.nodemanager.docker-container-executor.exec-name</name>
  <value>/usr/bin/docker</value>
  <description>
     Name or path to the Docker client. This is a required parameter. If this is empty,
     user must pass an image name as part of the job invocation(see below).
  </description>
</property>

<property>
  <name>yarn.nodemanager.container-executor.class</name>
  <value>org.apache.hadoop.yarn.server.nodemanager.DockerContainerExecutor</value>
  <description>
     This is the container executor setting that ensures that all
jobs are started with the DockerContainerExecutor.
  </description>
</property>

Administrators should be aware that DCE doesn’t currently provide user name-space isolation. This means, in particular, that software running as root in the YARN container will have root privileges in the underlying NodeManager. Put differently, DCE currently provides no better security guarantees than YARN’s Default Container Executor. In fact, DockerContainerExecutor will exit if it detects secure yarn.

Tips for connecting to a secure docker repository

By default, docker images are pulled from the docker public repository. The format of a docker image url is: username/image_name. For example, sequenceiq/hadoop-docker:2.4.1 is an image in docker public repository that contains java and hadoop.

If you want your own private repository, you provide the repository url instead of your username. Therefore, the image url becomes: private_repo_url/image_name. For example, if your repository is on localhost:8080, your images would be like: localhost:8080/hadoop-docker

To connect to a secure docker repository, you can use the following invocation:

    docker login [OPTIONS] [SERVER]

    Register or log in to a Docker registry server, if no server is specified
    "https://index.docker.io/v1/" is the default.

  -e, --email=""       Email
  -p, --password=""    Password
  -u, --username=""    Username

If you want to login to a self-hosted registry you can specify this by adding the server name.

docker login <private_repo_url>

This needs to be run as part of the NodeManager startup, or as a cron job if the login session expires periodically. You can login to multiple docker repositories from the same NodeManager, but all your users will have access to all your repositories, as at present the DockerContainerExecutor does not support per-job docker login.

Job Configuration

Currently you cannot configure any of the Docker settings with the job configuration. You can provide Mapper, Reducer, and ApplicationMaster environment overrides for the docker images, using the following 3 JVM properties respectively(only for MR jobs):

  • mapreduce.map.env: You can override the mapper’s image by passing yarn.nodemanager.docker-container-executor.image-name=your_image_name to this JVM property.
  • mapreduce.reduce.env: You can override the reducer’s image by passing yarn.nodemanager.docker-container-executor.image-name=your_image_name to this JVM property.
  • yarn.app.mapreduce.am.env: You can override the ApplicationMaster’s image by passing yarn.nodemanager.docker-container-executor.image-name=your_image_name to this JVM property.

Docker Image Requirements

The Docker Images used for YARN containers must meet the following requirements:

The distro and version of Linux in your Docker Image can be quite different from that of your NodeManager. (Docker does have a few limitations in this regard, but you’re not likely to hit them.) However, if you’re using the MapReduce framework, then your image will need to be configured for running Hadoop. Java must be installed in the container, and the following environment variables must be defined in the image: JAVA_HOME, HADOOP_COMMON_PATH, HADOOP_HDFS_HOME, HADOOP_MAPRED_HOME, HADOOP_YARN_HOME, and HADOOP_CONF_DIR

Working example of yarn launched docker containers

The following example shows how to run teragen using DockerContainerExecutor.

Step 1. First ensure that YARN is properly configured with DockerContainerExecutor(see above).

<property>
 <name>yarn.nodemanager.docker-container-executor.exec-name</name>
  <value>docker -H=tcp://0.0.0.0:4243</value>
  <description>
     Name or path to the Docker client. The tcp socket must be
     where docker daemon is listening.
  </description>
</property>

<property>
  <name>yarn.nodemanager.container-executor.class</name>
  <value>org.apache.hadoop.yarn.server.nodemanager.DockerContainerExecutor</value>
  <description>
     This is the container executor setting that ensures that all
jobs are started with the DockerContainerExecutor.
  </description>
</property>

Step 2. Pick a custom Docker image if you want. In this example, we’ll use sequenceiq/hadoop-docker:2.4.1 from the docker hub repository. It has jdk, hadoop, and all the previously mentioned environment variables configured.

Step 3. Run.

hadoop jar $HADOOP_PREFIX/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.2.jar \
  teragen \
     -Dmapreduce.map.env="yarn.nodemanager.docker-container-executor.image-name=sequenceiq/hadoop-docker:2.4.1" \
   -Dyarn.app.mapreduce.am.env="yarn.nodemanager.docker-container-executor.image-name=sequenceiq/hadoop-docker:2.4.1" \
  1000 \
  teragen_out_dir

Once it succeeds, you can check the yarn debug logs to verify that docker indeed has launched containers.