走近伏羲,谈5000节点集群调度与性能优化

阿里巴巴分布式调度系统被命名为“伏羲”,主要负责管理集群的机器资源和调度并发的计算任务,为上层分布式应用提供稳定、高效、安全的资源管理和任务调度服务。5K项目是一场全方位战役,给伏羲系统带来规模、性能、稳定、运维等多方面的技术挑战。我们从架构设计、实现细节和模块依赖等多维度入手,做了大量优化工作来规避性能陷阱,为用户构筑可用可靠的云计算引擎,进一步降低成本,挖掘数据价值。

     5K项目是飞天平台的里程碑,系统在规模、性能和容错方面都得到了飞跃式的发展,达到世界领先水平。伏羲作为飞天平台的分布式调度系统,能支持单集群5000节点,并发运行10000作业,30分钟完成100TB数据Terasort,性能是当时Yahoo ! 在Sort Benchmark上世界纪录的两倍。

伏羲介绍

“飞天”是阿里巴巴的云计算平台,其中的分布式调度系统被命名为“伏羲”(代码名称Fuxi),名字来自我国古代神话人物。伏羲主要负责管理集群的机器资源和调度并发的计算任务,目前支持离线数据处理(DAG Job)和在线服务(Service),为上层分布式应用如ODPS / OSS / OTS提供稳定、高效、安全的资源管理和任务调度服务,为阿里巴巴集团打造数据分享第一平台的目标提供了强大的计算引擎。

伏羲系统设计上采用M / S架构(如图1所示),系统有一个被称为“伏羲Master”的集群控制中心,其余每台机器上会运行一个叫做“伏羲Agent”的守护进程,守护进程除了管理节点上运行的任务外,还负责收集该节点上的资源使用情况,并将之汇报给控制中心。控制中心与伏羲Agent之间使用心跳机制,以监测节点健康状态。当用户向伏羲Master提交一个任务时,伏羲Master会调度出一个可用节点在其上启动任务的主控进程AppMaster,主控进程随后会向伏羲Master提出资源请求,得到伏羲Master分配的资源后,AppMaster通知相应节点上的伏羲Agent开始运行任务Worker。伏羲是一个支持多任务并发的调度系统,控制中心伏羲Master负责在多个任务之间仲裁,支持优先级、资源Quota配额和抢占。

使用伏羲,用户可以运行常见的MapReduce任务,还可以托管在线服务,满足不同应用场景的需求。多用户可以共享集群,伏羲支持配置分组的资源配额,限定每个用户组可以使用的计算资源。紧急任务如重要数据报表可以提高任务优先级来优先使用计算资源。

5K带来的挑战

在5K项目攻坚过程中,我们看到大型云计算平台从设计到实现每一步都可能存在性能“陷阱”,原因主要在三个方面:规模放大效应,当系统扩展到数千节点时,原本非瓶颈与规模成正比的环节,其影响会被放大;木桶效应,很多时候,系统中99 % 的地方都被优化过,完成剩下1 % 的优化看起来也只是“锦上添花”,然而那1 % 很可能就会成为影响系统性能的致命的瓶颈;长路径模块依赖,有些请求处理过程可能需要跨越多个模块(包括外部模块),而外部模块性能的不稳定性最终可能会影响到这个请求的处理性能和稳定性。

5K项目是一场全方位战役,给伏羲系统带来规模、性能、稳定、运维等多方面的技术挑战,例如下面的性能“陷阱”。

■通信消息DDoS:在5000规模的集群中,不同进程之间的RPC请求数量会随规模猛增,网络中总请求数可达10000 QPS,极易造成系统中单点进程的消息拥塞,从而导致请求处理严重超时。另外消息处理还存在队头阻塞(HoL)问题。

■关键函数OPS:伏羲Master是资源调度的中心节点,内部关键调度函数的OPS必须达到极高的标准,否则就可能因为木桶效应影响到集群整体的调度性能。

■故障恢复对外部模块依赖:伏羲Master具有对用户透明的故障恢复功能(Failover),其恢复过程依赖写在Nuwa上的Checkpoint(注:Nuwa是飞天平台的协同系统,如名字服务)。因此,整体恢复速度会受到Nuwa访问速度的影响。

我们做了大量伏羲优化工作来规避上述的性能“陷阱”,涉及到架构设计、实现细节和模块依赖,透过现象看本质,从最底层性能分析入手一步步找到瓶颈。下面结合具体的实战例子来分享优化过程。

伏羲优化实战

通信性能优化

在5K项目初期阶段,我们测试大规模并发作业时发现,当作业数量超过1000时就容易出现运行时间变长的现象。分析监控曲线和日志,我们发现AppMaster发给伏羲Master的资源请求出现大量消息超时,AppMaster迟迟拿不到资源,资源请求处理的延时很高。

消息从到达伏羲Master进程到最终被处理返回的总时间主要包括在队列中等待时间和实际处理的时间,因此延时高无非是两个原因:消息处理本身的OPS下降;消息堆积在待处理队列中未被及时处理。顺着这一思路,在通过Profiling发现伏羲Master资源调度关键函数并没有占到整个消息处理延时的大部分后,罪魁祸首就只剩下消息堆积了。在绘出了伏羲Master中资源调度消息队列中消息堆积的曲线之后,果然发现当作业数量增加时,堆积的请求数量剧增(如图2所示),每一条请求的处理时间也较小规模时高出很多。

为什么在伏羲Master队列中会堆积如此多的消息?在伏羲系统中,守护进程伏羲Agent和AppMaster都需要向负责资源调度的伏羲Master查询资源状态,在通信策略上采用了定期Polling的方式,缺省是每秒查询一次。采用Polling通信方式主要基于其简单性,能比较鲁棒地应对网络故障,消息传递发送过程比较自然有规律。然而在5000规模集群中,这个策略必须进行调整优化,否则会造成伏羲Master被大量请求“DDoS攻击”而无法服务。

定位到消息堆积的问题后,我们立即对消息通信策略进行了流控,算法简单有效:发送端检查如果上次询问的请求结果已经返回,表明目前伏羲Master请求处理较为顺畅,则间隔一个较短的时间后进行下一次询问。反之,如果上次询问的请求超时,说明伏羲Master较忙(例如有任务释放大批资源待处理等),发送端则等待较长时间后再发送请求。通过这种自适应流控的通信策略调整,伏羲Master消息堆积问题得到了有效解决。

此外,我们还解决了伏羲Master消息的队头阻塞(HoL)问题。AppMaster需要与伏羲Master通信获得资源调度结果,同时也与伏羲Agent通信进行Worker的启停。由于伏羲Agent数量远大于伏羲Master,在极端情况下,如果AppMaster采用同一个线程池来处理这些消息,那么伏羲Master消息会被前面大量的伏羲Agent消息阻塞。我们将消息处理的全路径包括从发送到处理完毕等各个时间段进行了Profling,结果印证了队头阻塞现象。当一个任务的Worker较多时,AppMaster需要与之通信的伏羲Agent也会增多,观察到AppMaster拿到资源的时间明显变长。针对队头阻塞问题,我们通信组件中加入了独立线程功能达到QoS的效果,并应用在AppMaster处理伏羲Master消息的通信中。如图3所示,伏羲Master的消息单独使用一个线程池,其余消息则共用另一个线程池。

通过上面的两项性能优化,伏羲系统内部的通信压力得到显著降低,提高了通信效率。AppMaster与伏羲Master之间的资源请求通信得到改善,任务提交后能很快分配到资源开始运行,提高了多并发任务场景下任务的完成速度。例如,经过这个优化,用户通过ODPS客户端对海量数据进行Ad hoc的SQL查询处理速度能得到显著提升。

关键函数优化

在5K项目中我们还重点关注系统中的关键函数性能,那里也可能藏着“陷阱”。伏羲Master在调度资源时的一个关键操作是:比较一个节点的空闲资源能否满足该节点上排队等待的所有资源请求,从而决定该资源分配给哪个任务。这个函数的调用次数会与机器规模和请求数量成正比,因此其速度对伏羲Master的调度OPS有决定性影响。

伏羲在调度资源时支持多个维度,如内存、CPU、网络、磁盘等,所有的资源和请求都用一个多维的键值对表示,例如 {Mem: 10, CPU: 50,net: 40,disk: 60}。因此,判断一个空闲资源能否满足一个资源请求的问题可以简单地抽象成多维向量的比较问题,例如R: [r1, r2, r3, r4] > Q: [q1, q2, q3, q4],其中1、2、3、4等数字表示各个维度,当且仅当R各个维度均大于Q时才判断R > Q。比较次数决定了这个操作的时间复杂度。最好情况下只需比较1次即可得出结果,如判断 [1, 10, 10, 10]大于 [2, 1, 1, 1]失败;最差需要D次(D为维度数),如判断 [10, 10, 10, 1]大于 [1, 1, 1, 10]需比较4次。在资源调度高频发生时,必须对这里的比较进行优化。

我们通过Profiling分析了系统运行时资源空闲与请求情况,在资源充足时通常值最大的维度最难满足,因此在资源调度场景我们采用基于主键的优化算法:对每个资源请求的最大值所在维度定义为该向量的主键,当有空闲资源时首先比较主键维度是否满足请求,如果在主键上满足再比较其他维度。此外,对一个节点上排队等待所有请求的主键值再求一个最小值,空闲资源如果小于该最小值则无需再比较其他请求。通过主键算法,我们大大减少了资源调度时向量比较次数,伏羲Master一次调度时间优化到几个毫秒。注意到资源请求提交后不会改变,因此计算主键的系统开销可以忽略不计。

伏羲Master关键调度性能的优化增强了系统的规模扩展能力,用户利用飞天平台能管理更大规模的集群,容纳更多的计算任务,发挥出云计算平台的成本优势。

模块依赖性能优化

伏羲Master支持故障恢复,在重启后进行故障恢复时需要从Nuwa读取所有任务的描述文件(Checkpoint)以继续运行用户任务。考虑到之前Nuwa服务在服务器端对文件内容没有做持久化,伏羲Master在读取了Checkpoint后还会再写一次Nuwa,这个回写操作性能依赖于Nuwa模块。在5000节点的集群上,名字解析压力的显著增加导致Nuwa在Server的回写操作上也出现了性能下降问题,最终通过模块依赖传递到了伏羲Master,从而影响了故障恢复的性能。经测试观察,一次Checkpoint回写就消耗70秒,这大大降低了伏羲系统的可用性。

我们对伏羲Master故障恢复进行了优化。首先,从伏羲Master的角度,在故障恢复时刚刚读取的Checkpoint内容在Nuwa服务器端是不会发生改变的,因此读取Checkpoint后没有必要回写到服务器端,只需要通知本地的Nuwa Agent让其代理即可,Agent会负责服务器宕机重启时向服务器推送本地缓存的文件内容。于是与Nuwa团队的同学合作,在Nuwa API中新增加一个只写本地的接口,这样伏羲Master规避了在故障恢复时回写Checkpoint的性能风险。优化后,在5000节点集群和并发5000任务的测试规模下,一次故障恢复中处理Checkpoint操作仅需18秒(主要时间在一次读取)。可见在分布式系统中,对外部模块的依赖哪怕只是一个RPC请求也可能是“性能陷阱”,在设计和实现时尽量避免出现在关键路径上。

故障恢复是分布式系统保证可用性必须具备的功能,经过优化,伏羲Master的快速故障恢复增强了飞天计算平台的可用性和稳定性,屏蔽了硬件故障,使用户的使用过程不受影响。

工程经验

高质量代码没有捷径可走,也不能只靠制度流程,唯有认真二字:作者认真、Reviewer认真、测试认真。

■任何一个Item,无论是解决Bug还是新增Feature,都必须在动手写代码前讨论清楚方案,Code Review不能代替方案讨论。在讨论时作者需要回答两个问题:这个解决方法真的可行吗?副作用是什么?这些讨论需要记录在Wiki或者BugFree等工具上进行跟踪。

■小步快跑,尽早提交Code Review,很多问题在这个阶段就能发现,不必等到测试中发现,代价大。

■代码Reviewer对Item有一半的责任,因此Review时不是简单过一遍字面完事的。我采用的Checklist有:是否准确反映了之前讨论好的方案;是否存在死锁、“性能陷阱”;模块化封装是否足够;函数名变量名是否规范,日志格式是否规范;注释是否足够。一段代码Review迭代10次左右是很常见的。

■一定要有针对性的测试验证。

■代码提交时关联相应的Bug和Review ID,便于后续追溯。

总结

以上和大家分享了5K项目的一些实践经验,伏羲系统在5K项目中还做了很多有意义的系统优化和技术探索,参与其中收获颇丰。性能是功能的一部分,是系统生死线而非锦上花。5K项目只是阿里云计算平台技术发展的一个开始,未来会在更大规模和更丰富计算模型等方面进一步发展,为用户构筑可用可靠的云计算引擎,进一步降低成本,挖掘数据价值。

 

[转载]https://lingyun.aliyun.com/4/tech-fuxi.html

The evolution of cluster scheduler architectures

Cluster schedulers are an important component of modern infrastructure, and have evolved significantly in the last few years. Their architecture has moved from monolithic designs to much more flexible, disaggregated and distributed designs. However, many current open-source offerings are either still monolithic, or otherwise lack key features. These features matter to real-world users, as they are required to achieve good utilization.

This post is our first in a series of posts about task scheduling on large clusters, such as those operated by internet companies like Amazon, Google, Facebook, Microsoft, or Yahoo!, but increasingly elsewhere too. Scheduling is an important topic because it directly affects the cost of operating a cluster: a poor scheduler results in low utilization, which costs money as expensive machines are left idle. High utilization, however, is not sufficient on its own: antagonistic workloads interfere with other workloads unless the decisions are made carefully.

Architectural evolution

This post discusses how scheduler architectures have evolved over the last few years, and why this happened. Figure 1 visualises the different approaches: a gray square corresponds to a machine, a coloured circle to a task, and a rounded rectangle with an “S” inside corresponds to a scheduler.0 Arrows indicate placement decisions made by schedulers, and the three colours correspond to different workloads (e.g., web serving, batch analytics, and machine learning).

(a) Monolithic scheduler. (b) Two-level scheduling. (c) Shared-state scheduling. (d) Distributed scheduling. (e) Hybrid scheduling.

Figure 1: Different cluster scheduler architectures. Gray boxes represent cluster machines, circles correspond to tasks and Si denotes scheduler i.

Many cluster schedulers – such as most high-performance computing (HPC) schedulers, the Borg scheduler, various early Hadoop schedulers and the Kubernetes scheduler – are monolithic. A single scheduler process runs on one machine (e.g., the JobTracker in Hadoop v1, and kube-scheduler in Kubernetes) and assigns tasks to machines. All workloads are handled by the same scheduler, and all tasks run through the same scheduling logic (Figure 1a). This is simple and uniform, and has led to increasingly sophisticated schedulers being developed. As an example, see the Paragon and Quasar schedulers, which use a machine learning approach to avoid negative interference between workloads competing for resources.

Most clusters run different types of applications today (as opposed to, say, just Hadoop MapReduce jobs in the early days). However, maintaining a single scheduler implementation that handles mixed (heterogeneous) workloads can be tricky, for several reasons:

  1. It is quite reasonable to expect a scheduler to treat long-running service jobs and batch analytics jobs differently.
  2. Since different applications have different needs, supporting them all keeps adding features to the scheduler, increasing the complexity of its logic and implementation.
  3. The order in which the scheduler processes tasks becomes an issue: queueing effects (e.g., head-of-line blocking) and backlog can become an issue unless the scheduler is carefully designed.

Overall, this sounds like the makings of an engineering nightmare – and the never-ending lists of feature requests that scheduler maintainers receive attests to this.1

Two-level scheduling architectures address this problem by separating the concerns of resource allocation and task placement. This allows the task placement logic to be tailored towards specific applications, but also maintains the ability to share the cluster between them. The Mesos cluster manager pioneered this approach, and YARN supports a limited version of it. In Mesos, resources are offered to application-level schedulers (which may pick and choose from them), while YARN allows the application-level schedulers to to requestresources (and receive allocations in return).2 Figure 1b shows the general idea: workload-specific schedulers (S0–S2) interact with a resource manager that carves out dynamic partitions of the cluster resources for each workload. This is a very flexible approach that allows for custom, workload-specific scheduling policies.

Yet, the separation of concerns in two-level architectures comes with a drawback: the application-level schedulers lose omniscience, i.e., they cannot see all the possible placement options any more.3 Instead, they merely see those options that correspond to resources offered (Mesos) or allocated (YARN) by the resource manager component. This has several disadvantages:

  1. Priority preemption (higher priority tasks kick out lower priority ones) becomes difficult to implement: in an offer-based model, the resources occupied by running tasks aren’t visible to the upper-level schedulers; in a request-based model, the lower-level resource manager must understand the preemption policy (which may be application-dependent).
  2. Schedulers are unable to consider interference from running workloads that may degrade resource quality (e.g., “noisy neighbours” that saturate I/O bandwidth), since they cannot see them.
  3. Application-specific schedulers care about many different aspects of the underlying resources, but their only means of choosing resources is the offer/request interface with the resource manager. This interface can easily become quite complex.

Shared-state architectures address this by moving to a semi-distributed model,4 in which multiple replicas of cluster state are independently updated by application-level schedulers, as shown in Figure 1c. After the change is applied locally, the scheduler issues an optimistically concurrent transaction to update the shared cluster state. This transaction may fail, of course: another scheduler may have made a conflicting change in the meantime.

The most prominent examples of shared-state designs are Omega at Google, and Apollo at Microsoft, as well as the Nomad container scheduler by Hashicorp. All of these materialise the shared cluster state in a single location: the “cell state” in Omega, the “resource monitor” in Apollo, and the “plan queue” in Nomad.5 Apollo differs from the other two as its shared-state is read-only, and the scheduling transactions are submitted directly to the cluster machines. The machines themselves check for conflicts and accept or reject the changes. This allows Apollo to make progress even if the shared-state is temporarily unavailable.6

A “logical” shared-state design can also be achieved without materialising the full cluster state anywhere. In this approach (somewhat similar to what Apollo does), each machine maintains its own state and sends updates to different interested agents such as schedulers, machine health monitors, and resource monitoring systems. Each machine’s local view of its state now forms a “shard” of the global shared-state.

However, shared-state architectures have some drawbacks, too: they must work with stale information (unlike a centralized scheduler), and may experience degraded scheduler performance under high contention (although this can apply to other architectures as well).

Fully-distributed architectures take the disaggregation even further: they have no coordination between schedulers at all, and use many independent schedulers to service the incoming workload, as shown in Figure 1d. Each of these schedulers works purely with its local, partial, and often out-of-date view of the cluster. Jobs can typically be submitted to any scheduler, and each scheduler may place tasks anywhere in the cluster. Unlike with two-level schedulers, there are no partitions that each scheduler is responsible for. Instead, the overall schedule and resource partitioning are emergent consequences of statistical multiplexing and randomness in workload and scheduler decisions – similar to shared-state schedulers, albeit without any central control at all.

The recent distributed scheduler movement probably started with the Sparrow paper, although the underlying concept (power of multiple random choices) first appeared in 1996. The key premise of Sparrow is a hypothesis that the tasks we run on clusters are becoming ever shorter in duration, supported by an argument that fine-grained tasks have many benefits. Consequently, the authors assume that tasks are becoming more numerous, meaning that a higher decision throughput must be supported by the scheduler. Since a single scheduler may not be able to keep up with this throughput (assumed to be a million tasks per second!), Sparrow spreads the load across many schedulers.

This makes perfect sense: and the lack of central control can be conceptually appealing, and it suits some workloads very well – more on this in a future post. For the moment, it suffices to note that since the distributed schedulers are uncoordinated, they apply significantly simpler logic than advanced monolithic, two-level, or shared-state schedulers. For example:

  1. Distributed schedulers are typically based on a simple “slot” concept that chops each machine into n uniform slots, and places up to nparallel tasks. This simplifies over the fact that tasks’ resource requirements are not uniform.
  2. They also use worker-side queues with simple service disciplines (e.g., FIFO in Sparrow), which restricts scheduling flexibility, as the scheduler can merely choose at which machine to enqueue a task.
  3. Distributed schedulers have difficulty enforcing global invariants (e.g., fairness policies or strict priority precedence), since there is no central control.
  4. Since they are designed for rapid decisions based on minimal knowledge, distributed schedulers cannot support or afford complex or application-specific scheduling policies. Avoiding interference between tasks, for example, becomes tricky.

Hybrid architectures are a recent (mostly academic) invention that seeks to address these drawbacks of fully distributed architectures by combining them with monolithic or shared-state designs. The way this typically works – e.g., in Tarcil, Mercury, and Hawk – is that there really are two scheduling paths: a distributed one for part of the workload (e.g., very short tasks, or low-priority batch workloads), and a centralized one for the rest. Figure 1e illustrates this design. The behaviour of each constituent part of a hybrid scheduler is identical to the part’s architecture described above. In practice, no hybrid schedulers have been deployed in production settings yet, however, as far as I know.

What does this mean in practice?

Discussion about the relative merits of different scheduler architectures is not merely an academic topic, although it naturally revolves around research papers. For an extensive discussion of the Borg, Mesos and Omega papers from an industry perspective, for example, seeAndrew Wang’s excellent blog post. Moreover, many of the systems discussed are deployed in production settings at large enterprises (e.g., Apollo at Microsoft, Borg at Google, and Mesos at Apple), and they have in turn inspired other systems that are available as open source projects.

These days, many clusters run containerised workloads, and consequently a variety of contained-focused “orchestration frameworks” have appeared. These are similar to what Google and others call “cluster managers”. However, there are few detailed discussions of the schedulers within these frameworks and their design principles, and they typically focus more on the user-facing scheduler APIs (e.g., this report by Armand Grillet, which compares Docker Swarm, Mesos/Marathon, and the Kubernetes default scheduler). Moreover, many users neither know what difference the scheduler architecture makes, nor which one is most suitable for their applications.

Figure 2 shows an overview of a selection of open-source orchestration frameworks, their architecture and the features supported by their schedulers. At the bottom of the table, We also include closed-source systems at Google and Microsoft for reference. The resource granularity column indicates whether the scheduler assigns tasks to fixed-size slots, or whether it allocates resources in multiple dimensions (e.g., CPU, memory, disk I/O bandwidth, network bandwidth, etc.).

Framework Architecture Resource granularity Multi-scheduler Pluggable logic Priority preemption Re-scheduling Over-subscription Resource estimation Avoid interference
O
P
E
N
Kubernetes monolithic multi-dimensional N[v1.2,DD,Issue] Y[DD] N[Issue] N[Issue] Y[DD] N N
Swarm monolithic multi-dimensional N N N[Issue] N N N N
YARN monolithic/
two-level
RAM/CPU slots Y N[app-lvl. only] N[JIRA] N N[JIRA] N N
Mesos two-level multi-dimensional Y Y[framework-lvl.] N[JIRA] N Y[v0.23,Doc] N N
Nomad shared-state multi-dimensional Y Y N[Issue] N[Issue] N[Issue] N N
Sparrow fully-distributed fixed slots Y N N N N N N
C
L
O
S
E
D
Borg monolithic[7] multi-dimensional N[7] N[7] Y Y Y Y N
Omega shared-state multi-dimensional Y Y Y Y Y Y N
Apollo shared-state multi-dimensional Y Y Y Y N N N

Figure 2: Architectural classification and feature matrix of widely-used orchestration frameworks, compared to closed-source systems.

One key aspect that helps determine an appropriate scheduler architecture is whether your cluster runs a heterogeneous (i.e., mixed) workload. This is the case, for example, when combining production front-end services (e.g., load-balanced web servers and memcached) with batch data analytics (e.g., MapReduce or Spark). Such combinations make sense in order to improve utilization, but the different applications have different scheduling needs. In a mixed setting, a monolithic scheduler likely results in sub-optimal assignments, since the logic cannot be diversified on a per-application basis. A two-level or shared-state scheduler will likely offer benefits here. 8

Most user-facing service workloads run with resource allocations sized to serve peak demand expected of each container, but in practice they typically under-utilize their allocations substantially. In this situation, being able to opportunistically over-subscribe the resources with lower-priority workloads (while maintaining QoS guarantees) is the key to an efficient cluster. Mesos is currently the only open-source system that ships support for such over-subscription, although Kubernetes has a fairly mature proposal for adding it. We should expect more activity in this space in the future, since the utilization of most clusters is still substantially lower than the 60-70% reported for Google’s Borg clusters. We will focus on resource estimation, over-subscription and efficient machine utilization in a future post in this series.

Finally, specific analytics and OLAP-style applications (for example, Dremel or SparkSQL queries) can benefit from fully-distributed schedulers. However, fully-distributed schedulers (like e.g., Sparrow) come with fairly restricted feature sets, and thus work best when the workload is homogeneous (i.e., all tasks run for roughly the same time), set-up times are low (i.e., tasks are scheduled to long-running workers, as e.g., with MapReduce application-level tasks in YARN), and task churn is very high (i.e., many scheduling decisions must be made in a short time). We will talk more about these conditions and why fully-distributed schedulers – and the distributed components of hybrid schedulers – only make sense for these applications in the next blog post in this series. For now, it suffices to observe that distributed schedulers are substantially simpler than others, and do not support multiple resource dimensions, over-subscription, or re-scheduling.

Overall, the table in Figure 2 is evidence that the open-source frameworks still have some way to go until they match the feature sets of advanced, but closed-source systems. This should serve as a call to action: as a result of missing features, utilization suffers, task performance is unpredictable, noisy neighbours cause pagers to go off, and elaborate hacks are required to coerce schedulers into supporting some user needs.

However, there are some good news: while many frameworks have monolithic schedulers today, many are also moving towards more flexible designs. Kubernetes already supports pluggable schedulers (the kube-scheduler pod can be replaced by another API-compatible scheduler pod), multiple schedulers from v1.2, and has ongoing work on “extenders” to supply custom policies. Docker Swarm may – to our understanding – also gain pluggable scheduler support in the future.

What’s next?

The next blog post in this series will look at the question of whether fully distributed architectures are the key innovation required to scalecluster schedulers further (spoiler: not necessarily). After that, we will also look at resource-fitting strategies (essential for good utilisation), and finally discuss how our Firmament scheduling platform combines many of the benefits of a shared-state architecture with the scheduling quality of monolithic schedulers and the speed of fully-distributed schedulers.

 

Correction: March 10, 2016
An earlier version of the text incorrectly reported the implementation status of some Kubernetes features. We amended the table in Figure 2and the text to clarify that scheduler extenders are implemented, and that over-subscription is supported although automatic resource estimation is not. We also added a footnote explaining that a single scheduler can serve a mixed workload, but that its complexity will be high.

Correction: March 15, 2016
An earlier version of the text suggested that YARN and Mesos are two-level designs in an equal sense. However, YARN’s application-level scheduling is substantially less powerful than Mesos’s. This is now clearer in the text, and clarified further in footnote 2.

Follow us on Twitter to find out about new posts.

 


0 – This figure simplifies things a bit: of course, in practice each machine runs more than one task, and many schedulers fit tasks in multiple resource dimensions, rather than into simple slots.

1 – As an illustrative example, kube-scheduler in Kubernetes currently has outstanding feature requests for re-scheduling (pod migration), priority preemption, and resource over-subscription in its monolithic scheduler.

2 – YARN’s approach is restricted compared to Mesos because the application-level logic cannot choose resources (unless it requests much more than it needs from the resource manager), but it can only place application-level “tasks” to pre-existing containers that represent cluster-level tasks.
This is a good fit for a system like Hadoop MapReduce, in which application-level tasks (maps and reduces) must be assigned to a dynamic collection of workers in an application-specific way (e.g., optimised for data locality and per-job). It is less suited to building a more general, multi-application scheduler on top – for example, a service scheduler like the “Marathon” framework for Mesos.
Monolithic schedulers like the Kubernetes one do not support this and rely on the application doing its own scheduling (e.g., running a Spark “worker controller” as a long-running service). Consequently, there are efforts to put Kubernetes on top of YARN via a specialYARNScheduler extension – requiring two complex systems to be administered. However, there are also long-term efforts to improve native “big data” batch processing support in Kubernetes.

3 – In the Omega paper, this problem is referred to as “information hiding”.

4 – Curiously, the literature does not appear to be quite sure in agreement about whether to consider shared-state schedulers centralized or distributed: the Hawk paper treats them as examples of distributed schedulers, while the Mercury paper refers to them as examples of a centralized architecture!

5 – Nomad actually uses a slightly different approach to Omega and Apollo: while multiple independent schedulers exist, jobs are not submitted directly to them, but instead arrive via a centralised “evaluation broker” queue.

6 – It’s worth noting that the same optimisation – taking the shared-state off the critical path to enacting scheduling decisions – can be applied to Omega, but not to Nomad (in its current design): Omega can ship deltas directly to machines and update the cell state out-of-band, while Nomad’s design is premised on the leader reconciling changes in the plan queue.

7 – The table entry reflects the original Borg, but the Borg paper and the recent ACM Queue paper note that multi-scheduler support and other features have been back-ported into from Omega into Borg.

8 – That said, having multiple schedulers is not a necessary precondition for serving mixed workloads: the Borg scheduler is a case in point that a sophisticated single scheduler can serve both long-running service and batch workloads. However, this comes at the expense of higher scheduler implementation complexity – a key motivation for Omega’s multi-scheduler design.

 

[转载]http://www.cl.cam.ac.uk/research/srg/netos/camsas/blog/2016-03-09-scheduler-architectures.html

mesos, omega, borg: a survey

Google recently unveiled one of their crown jewels of system infrastructure: Borg, their cluster scheduler. This prompted me to re-read the Mesos and Omega papers, which deal with the same topic. I thought it’d be interested to do a compare and contrast of these systems. Mesos gets credit for the groundbreaking idea of two-level scheduling, Omega improved upon this with an analogy from databases, and Borg can sort of be seen as the culmination of all these ideas.

Background

Cluster schedulers have existed long before big data. There’s a rich literature on scheduling on 1000s of cores in the HPC world, but their problem domain is simpler than what is addressed bydatacenter schedulers, meaning Mesos/Borg and their ilk. Let’s compare and contrast on a few dimensions.

Scheduling for locality

Supercomputers separate storage and compute and connect them with an approximately full-bisection bandwidth network that goes at close to memory speeds (GB/s). This means your tasks can get placed anywhere on the cluster without worrying much about locality, since all compute nodes can access data equally quickly. There are a few hyper-optimized applications that optimize for the network topology, but these are very rare.

Data center schedulers do care about locality, and in fact this is the whole point of GFS and MapReduce co-design. Back in the 2000s, network bandwidth was comparatively much more expensive than disk bandwidth. So, there was a huge economic savings by scheduling your computation tasks on the same node that held the data. This is a major scheduling constraint; whereas before you could put the task anywhere, now it needs to go on one of the three data replicas.

Hardware configuration

Supercomputers are typically composed of homogeneous nodes, i.e. they all have the same hardware specs. This is because supercomputers are typically purchased in one shot: a lab gets $x million dollars for a new one, and they spend it all upfront. Some HPC applications are optimized for the specific CPU models in a supercomputer. New technology like GPUs or co-processors are rolled out as a new cluster.

In the big data realm, clusters are primarily storage constrained, so operators are continually adding new racks with updated specs to expand cluster capacity. This means it’s typical for nodes to have different CPUs, memory capacities, number of disks, etc. Also toss in special additions like SSDs, GPUs, shingled drives. A single datacenter might need to support a broad range of applications, and all of this again imposes additional scheduling constraints.

Queue management and scheduling

When running an application on a supercomputer, you specify how many nodes you want, the queue you want to submit your job to, and how long the job will run for. Queues place different restrictions on how many resources you can request and how long your job can run for. Queues also have a priority or reservation based system to determine ordering. Since the job durations are all known, this is a pretty easy box packing problem. If the queues are long (typically true) and there’s a good mix of small jobs to backfill the space leftover from big jobs (also typical), you can achieve extremely high levels of utilization. I like to visualize this in 2D, with time as X and resource usage as Y.

As per the previous, datacenter scheduling is a more general problem. The “shape” of resource requests can be quite varied, and there are more dimensions. Jobs also do not have a set duration, so it’s hard to pre-plan queues. Thus we have more sophisticated scheduling algorithms, and the performance of the scheduler thus becomes important.

Utilization as a general rule is going to be worse (unless you’re Google; more on that later), but one benefit over HPC workloads is that MapReduce and similar can be incrementally scheduled instead of gang scheduled. HPC, we wait until all N nodes that you requested are available, then run all your tasks at once. MR can instead run its tasks in multiple waves, meaning it can still effectively use bits of leftover resources. A single MR job can also ebb and flow based on cluster demand, which avoids the need for preemption or resource reservations, and also helps with fairness between multiple users.

Mesos

Mesos predates YARN, and was designed with the problems of the original MapReduce in mind. Back then, Hadoop clusters could run only a single application: MapReduce. This made it difficult to run applications that didn’t conform to a map phase followed by a reduce phase. The biggest example here is Spark. Previously, you’d have to install a whole new set of workers and masters for Spark, which would sit alongside your MapReduce workers and masters. Hardly ideal from a utilization perspective, since they were typically statically partitioned.

Mesos addresses this problem by providing a generalized scheduler for all cluster applications. MapReduce and Spark became simply different applications using the same underlying resource sharing framework. The simplest approach would be to write a centralized scheduler, but that has a number of drawbacks:

  • API complexity. We need a single API that is a superset of all known framework scheduler APIs. This is difficult by itself. Expressing resource requests will also become very complicated.
  • Performance. 10’s of thousands of nodes and millions of tasks is a lot, especially if the scheduling problem is complex.
  • Code agility. New schedulers and new frameworks are constantly being written, with new requirements.

Instead, Mesos introduces the idea of two-level scheduling. Mesos delegates the per-application scheduling work to the applications themselves, while Mesos still remains responsible for resource distribution between applications and enforcing overall fairness. This means Mesos can be pretty thin, 10K lines of code.

Two-level scheduling happens through a novel API called resource offers, where Mesos periodically offers some resources to the application schedulers. This sounds backwards at first (the request goes from the master to the application?), but it’s actually not that strange. In MR1, the TaskTracker workers are the source of truth as to what’s running on a node. When a TT heartbeats in saying that a task has completed, the JobTracker then chooses something else to run on that TaskTracker. Scheduling decisions are triggered by what’s essentially a resource offer from the worker. In Mesos, the resource offer comes from the Mesos master instead of the slave, since Mesos is managing the cluster. Not that different.

Resource offers act as time-bounded leases for some resources. Mesos offers resources to an application based on policies like priority or fair share. The app then computes how it uses them, and tells Mesos what resources from the offer it wants. This gives the app lots of flexibility, since it can choose to run a portion of tasks now, wait for a bigger allocation later (gang scheduling), or size its tasks differently to fit what’s available. Since offers are time-bounded, it also incentivizes applications to schedule quickly.

Some concerns and how they were addressed:

  • Long tasks hogging resources. Mesos lets you reserve some resources for short tasks, killing them after a time limit. This also incentivizes using short tasks, which is good for fairness.
  • Performance isolation. Use Linux Containers (cgroups).
  • Starvation of large tasks. It’s difficult to get sole access to a node, since some other app with smaller tasks will snap it up. The fix is having a minimum offer size.

Unaddressed / unknown resolution:

  • Gang scheduling. I think this is impossible to do with high utilization without either knowing task lengths or preempting. Incrementally hoarding resources works with low utilization, but can result in deadlock.
  • Cross-application preemption is also hard. The resource offer API has no way of saying “here are some low-priority tasks I could kill if you want them”. Mesos depends on tasks being short to achieve fairness.

Omega

Omega is sort of a successor to Mesos, and in fact shares an author. Since the paper uses simulated results for its evaluation, I suspect it never went into production at Google, and the ideas were rolled into the next generation of Borg. Rewriting the API is probably too invasive of a change, even for Google.

Omega takes the resource offers one degree further. In Mesos, resource offers are pessimistic or exclusive. If a resource has been offered to an app, the same resource won’t be offered to another app until the offer times out. In Omega, resource offers are optimistic. Every application is offered all the available resources on the cluster, and conflicts are resolved at commit time. Omega’s resource manager is essentially just a relational database of all the per-node state with different types of optimistic concurrency control to resolve conflicts. The upside of this is vastly increased scheduler performance (full parallelism) and better utilization.

The downside of all this is that applications are in a free-for-all where they are allowed to gobble up resources as fast as they want, and even preempt other users. This is okay for Google because they use a priority-based system, and can go yell at their internal users. Their workload broadly falls into just two priority bands: high-priority service jobs (HBase, webservers, long-lived services) and low-priority batch jobs (MapReduce and similar). Applications are allowed to preempt lower-priority jobs, and are also trusted to stay within their cooperatively enforced limits on # of submitted jobs, amount of allocated resources, etc. I think Yahoo has said differently about being able to go yell at users (certainly not scalable), but it works somehow at Google.

Most of the paper talks about how this optimistic allocation scheme works with conflicts, which is always the question. There are a few high-level notes:

  • Service jobs are larger, and have more rigorous placement requirements for fault-tolerance (spread across racks).
  • Omega can probably scale up to 10s but not 100s of schedulers, due to the overhead of distributing the full cluster state.
  • Scheduling times of a few seconds is typical. They also compare up to 10s and 100s of seconds, which is where the benefits of two-level scheduling really kick in. Not sure how common this is, maybe for service jobs?
  • Typical cluster utilization is about 60%.
  • Conflicts are rare enough that OCC works in practice. They were able to go up to 6x their normal batch workload before the scheduler fell apart.
  • Incremental scheduling is very important. Gang-scheduling is significantly more expensive to implement due to increased conflicts. Apparently most applications can do incremental okay, and can just do a couple partial allocations to get up to their total desired amount.
  • Even for complicated schedulers (10s per-job overheads), Omega can still schedule a mixed workload with reasonable wait times.
  • Experimenting with a new MapReduce scheduler was empirically easy with Omega

Open questions

  • At some point, optimistic concurrency control breaks down because of a high conflict rate and the duplicated work from retries. It seems like they won’t run into this in practice, but I wonder if there are worst-case scenarios with oddly-shaped tasks. Is this affected by the mix of service and batch jobs? Is this something that is tuned in practice?
  • Is a lack of global policies really acceptable? Fairness, preemption, etc.
  • What’s the scheduling time like for different types of jobs? Have people written very complicated schedulers?

Borg

This is a production experience paper. It’s the same workload as Omega since it’s also Google, so many of the metapoints are the same.

High-level

  • Everything runs within Borg, including the storage systems like CFS and BigTable.
  • Median cluster size is 10K nodes, though some are much bigger.
  • Nodes can be very heterogeneous.
  • Linux process isolation is used (essentially containers), since Borg predates modern virtual machine infrastructure. Efficiency and launch time were important.
  • All jobs are statically linked binaries.
  • Very complicated, very rich resource specification language available
  • Can rolling update running jobs, meaning configuration and binary. This sometimes requires a task restart, so fault-tolerance is important.
  • Support for “graceful stop” via SIGTERM before final kill via SIGKILL. The soft kill is optional, and can not be relied on for correctness.

Allocs

  • Resource allocation is separated from process liveness. An alloc can be used for task grouping or to hold resources across task restarts.
  • An alloc set is a group of allocs on multiple machines. Multiple jobs can be run within a single alloc.
  • This is actually a pretty common pattern! Multi-process is useful to separate concerns and development.

Priorities and quotas

  • Two priority bands: high and low for service and batch.
  • Higher priority jobs can preempt lower priority
  • High priority jobs cannot preempt each other (prevents cascading livelock situations)
  • Quotas are used for admission control. Users pay more for quota at higher priorities.
  • Also provide a “free” tier that runs at lowest priority, to encourage high utilization and backfill work.
  • This is a simple and easy to understand system!

Scheduling

  • Two phases to scheduling: finding feasible nodes, then scoring these nodes for final placement.
  • Feasibility is heavily determined by task constraints.
  • Scoring is mostly determined by system properties, like best-fit vs. worst-fit, job mix, failure domains, locality, etc.
  • Once final nodes are chosen, Borg will preempt to fit if necessary.
  • Typical scheduling time is around 25s, because of localizing dependencies. Downloading the binaries is 80% of this. This locality matters. Torrent and tree protocols are used to distribute binaries.

Scalability

  • Centralization has not been an impossible performance bottleneck.
  • 10s of thousands of nodes, 10K tasks per minute scheduling rate.
  • Typical Borgmaster uses 10-14 cores and 50GB of RAM.
  • Architecture has become more and more multi-process over time, with reference to Omega and two-level scheduling.
  • Single master Borgmaster, but some responsibilities are still sharded: state updates from workers, read-only RPCs.
  • Some obvious optimizations: cache machine scores, compute feasibility once per task type, don’t attempt global optimality when making scheduling decisions.
  • Primary argument against bigger cells is isolation from operator errors and failure propagation. Architecture keeps scaling fine

Utilization

  • Their primary metric was cell compaction, or the smallest cluster that can still fit a set of tasks. Essentially box packing.
  • Big gains from the following: not segregating workloads or users, having big shared clusters, fine-grained resource requests.
  • Optimistic overcommit on a per-Borglet basis. Borglets do resource estimation, and backfill non-prod work. If the estimation is incorrect, kill off the non-prod work. Memory is the inelastic resource.
  • Sharing does not drastically affect CPI (CPU interference), but I wonder about the effect on storage.

Lessons learned

The issues listed here are pretty much fixed in Kubernetes, their public, open-source container scheduler.

Bad:

  • Would be nice to schedule multi-job workflows rather than single joba, for tracking and management. This also requires more flexible ways of referring to components of a workflow. This is solved by attaching arbitrary key-value pairs to each task and allowing users to query against them.
  • One IP per machine. This leads to port conflicts on a single machine and complicates binding and service discovery. This is solved by Linux namespaces, IPv6, SDN.
  • Complicated specification language. Lots of knobs to turn, which makes it hard to get started as a casual user. Some work on automatically determining resource requirements.

Good:

  • Allocs are great! Allows helper services to be easily placed next to the main task.
  • Baking in services like load balancing and naming is very useful.
  • Metrics, debugging, web UIs are very important so users can solve their own problems.
  • Centralization scales up well, but need to split it up into multiple processes. Kubernetes does this from the start, meaning a nice clean API between the different scheduler components.

Closing remarks

It seems like YARN will need to draw from Mesos and Omega to scale up to the 10K node scale. YARN is still a centralized scheduler, which is the strawman for comparison in Mesos and Omega. Borg specifically mentions the need to shard to scale.

Isolation is very important to achieve high utilization without compromising SLOs. This can surface at the application layer, where apps themselves need to be design to be latency-tolerant. Think tail-at-scale request replication in BigTable. Ultimately it comes down to hardware spend vs. software spend. Running at lower utilization sidesteps this problem. Or, you can tackle it head-on through OS isolation mechanisms, resource estimation, and tuning your workload and schedulers. At Google-scale, there’s enough hardware that it makes sense to hire a bunch of kernel developers. Fortunately they’ve done the work for us 🙂

I wonder also if the Google workload assumptions apply more generally. Priority bands, reservations, and preemption work well for Google, but our customers almost all use the fair share scheduler. Yahoo uses the capacity scheduler. Twitter uses the fair scheduler. I haven’t heard of any demand or usage of a priority + reservation scheduler.

Finally, very few of our customers run big shared clusters as envisioned at Google. We have customers with thousands of nodes, but this is split up into pods of hundreds of nodes. It’s also still common to have separate clusters for separate users or applications. Clusters are also typically homogeneous in terms of hardware. I think this will begin to change though, and soon.

[转载]http://umbrant.com/blog/2015/mesos_omega_borg_survey.html

Apache Hadoop YARN

YARN Architecture

The fundamental idea of YARN is to split up the functionalities of resource management and job scheduling/monitoring into separate daemons. The idea is to have a global ResourceManager (RM) and per-application ApplicationMaster (AM). An application is either a single job or a DAG of jobs.

The ResourceManager and the NodeManager form the data-computation framework. The ResourceManager is the ultimate authority that arbitrates resources among all the applications in the system. The NodeManager is the per-machine framework agent who is responsible for containers, monitoring their resource usage (cpu, memory, disk, network) and reporting the same to the ResourceManager/Scheduler.

The per-application ApplicationMaster is, in effect, a framework specific library and is tasked with negotiating resources from the ResourceManager and working with the NodeManager(s) to execute and monitor the tasks.

MapReduce NextGen Architecture

The ResourceManager has two main components: Scheduler and ApplicationsManager.

The Scheduler is responsible for allocating resources to the various running applications subject to familiar constraints of capacities, queues etc. The Scheduler is pure scheduler in the sense that it performs no monitoring or tracking of status for the application. Also, it offers no guarantees about restarting failed tasks either due to application failure or hardware failures. The Scheduler performs its scheduling function based the resource requirements of the applications; it does so based on the abstract notion of a resource Container which incorporates elements such as memory, cpu, disk, network etc.

The Scheduler has a pluggable policy which is responsible for partitioning the cluster resources among the various queues, applications etc. The current schedulers such as the CapacityScheduler and the FairScheduler would be some examples of plug-ins.

The ApplicationsManager is responsible for accepting job-submissions, negotiating the first container for executing the application specific ApplicationMaster and provides the service for restarting the ApplicationMaster container on failure. The per-application ApplicationMaster has the responsibility of negotiating appropriate resource containers from the Scheduler, tracking their status and monitoring for progress.

MapReduce in hadoop-2.x maintains API compatibility with previous stable release (hadoop-1.x). This means that all MapReduce jobs should still run unchanged on top of YARN with just a recompile.

Capacity Scheduler

Purpose

This document describes the CapacityScheduler, a pluggable scheduler for Hadoop which allows for multiple-tenants to securely share a large cluster such that their applications are allocated resources in a timely manner under constraints of allocated capacities.

Overview

The CapacityScheduler is designed to run Hadoop applications as a shared, multi-tenant cluster in an operator-friendly manner while maximizing the throughput and the utilization of the cluster.

Traditionally each organization has it own private set of compute resources that have sufficient capacity to meet the organization’s SLA under peak or near peak conditions. This generally leads to poor average utilization and overhead of managing multiple independent clusters, one per each organization. Sharing clusters between organizations is a cost-effective manner of running large Hadoop installations since this allows them to reap benefits of economies of scale without creating private clusters. However, organizations are concerned about sharing a cluster because they are worried about others using the resources that are critical for their SLAs.

The CapacityScheduler is designed to allow sharing a large cluster while giving each organization capacity guarantees. The central idea is that the available resources in the Hadoop cluster are shared among multiple organizations who collectively fund the cluster based on their computing needs. There is an added benefit that an organization can access any excess capacity not being used by others. This provides elasticity for the organizations in a cost-effective manner.

Sharing clusters across organizations necessitates strong support for multi-tenancy since each organization must be guaranteed capacity and safe-guards to ensure the shared cluster is impervious to single rouge application or user or sets thereof. The CapacityScheduler provides a stringent set of limits to ensure that a single application or user or queue cannot consume disproportionate amount of resources in the cluster. Also, the CapacityScheduler provides limits on initialized/pending applications from a single user and queue to ensure fairness and stability of the cluster.

The primary abstraction provided by the CapacityScheduler is the concept of queues. These queues are typically setup by administrators to reflect the economics of the shared cluster.

To provide further control and predictability on sharing of resources, the CapacityScheduler supports hierarchical queues to ensure resources are shared among the sub-queues of an organization before other queues are allowed to use free resources, there-by providing affinity for sharing free resources among applications of a given organization.

Features

The CapacityScheduler supports the following features:

  • Hierarchical Queues – Hierarchy of queues is supported to ensure resources are shared among the sub-queues of an organization before other queues are allowed to use free resources, there-by providing more control and predictability.
  • Capacity Guarantees – Queues are allocated a fraction of the capacity of the grid in the sense that a certain capacity of resources will be at their disposal. All applications submitted to a queue will have access to the capacity allocated to the queue. Adminstrators can configure soft limits and optional hard limits on the capacity allocated to each queue.
  • Security – Each queue has strict ACLs which controls which users can submit applications to individual queues. Also, there are safe-guards to ensure that users cannot view and/or modify applications from other users. Also, per-queue and system administrator roles are supported.
  • Elasticity – Free resources can be allocated to any queue beyond it’s capacity. When there is demand for these resources from queues running below capacity at a future point in time, as tasks scheduled on these resources complete, they will be assigned to applications on queues running below the capacity (pre-emption is not supported). This ensures that resources are available in a predictable and elastic manner to queues, thus preventing artifical silos of resources in the cluster which helps utilization.
  • Multi-tenancy – Comprehensive set of limits are provided to prevent a single application, user and queue from monopolizing resources of the queue or the cluster as a whole to ensure that the cluster isn’t overwhelmed.
  • Operability
    • Runtime Configuration – The queue definitions and properties such as capacity, ACLs can be changed, at runtime, by administrators in a secure manner to minimize disruption to users. Also, a console is provided for users and administrators to view current allocation of resources to various queues in the system. Administrators can add additional queues at runtime, but queues cannot be deleted at runtime.
    • Drain applications – Administrators can stop queues at runtime to ensure that while existing applications run to completion, no new applications can be submitted. If a queue is in STOPPED state, new applications cannot be submitted to itself or any of its child queueus. Existing applications continue to completion, thus the queue can be drained gracefully. Administrators can also start the stopped queues.
  • Resource-based Scheduling – Support for resource-intensive applications, where-in a application can optionally specify higher resource-requirements than the default, there-by accomodating applications with differing resource requirements. Currently, memory is the the resource requirement supported.

 

  • Queue Mapping based on User or Group – This feature allows users to map a job to a specific queue based on the user or group.

 

 

 

Fair Scheduler

Purpose

This document describes the FairScheduler, a pluggable scheduler for Hadoop that allows YARN applications to share resources in large clusters fairly.

Introduction

Fair scheduling is a method of assigning resources to applications such that all apps get, on average, an equal share of resources over time. Hadoop NextGen is capable of scheduling multiple resource types. By default, the Fair Scheduler bases scheduling fairness decisions only on memory. It can be configured to schedule with both memory and CPU, using the notion of Dominant Resource Fairness developed by Ghodsi et al. When there is a single app running, that app uses the entire cluster. When other apps are submitted, resources that free up are assigned to the new apps, so that each app eventually on gets roughly the same amount of resources. Unlike the default Hadoop scheduler, which forms a queue of apps, this lets short apps finish in reasonable time while not starving long-lived apps. It is also a reasonable way to share a cluster between a number of users. Finally, fair sharing can also work with app priorities – the priorities are used as weights to determine the fraction of total resources that each app should get.

The scheduler organizes apps further into “queues”, and shares resources fairly between these queues. By default, all users share a single queue, named “default”. If an app specifically lists a queue in a container resource request, the request is submitted to that queue. It is also possible to assign queues based on the user name included with the request through configuration. Within each queue, a scheduling policy is used to share resources between the running apps. The default is memory-based fair sharing, but FIFO and multi-resource with Dominant Resource Fairness can also be configured. Queues can be arranged in a hierarchy to divide resources and configured with weights to share the cluster in specific proportions.

In addition to providing fair sharing, the Fair Scheduler allows assigning guaranteed minimum shares to queues, which is useful for ensuring that certain users, groups or production applications always get sufficient resources. When a queue contains apps, it gets at least its minimum share, but when the queue does not need its full guaranteed share, the excess is split between other running apps. This lets the scheduler guarantee capacity for queues while utilizing resources efficiently when these queues don’t contain applications.

The Fair Scheduler lets all apps run by default, but it is also possible to limit the number of running apps per user and per queue through the config file. This can be useful when a user must submit hundreds of apps at once, or in general to improve performance if running too many apps at once would cause too much intermediate data to be created or too much context-switching. Limiting the apps does not cause any subsequently submitted apps to fail, only to wait in the scheduler’s queue until some of the user’s earlier apps finish.

Hierarchical queues with pluggable policies

The fair scheduler supports hierarchical queues. All queues descend from a queue named “root”. Available resources are distributed among the children of the root queue in the typical fair scheduling fashion. Then, the children distribute the resources assigned to them to their children in the same fashion. Applications may only be scheduled on leaf queues. Queues can be specified as children of other queues by placing them as sub-elements of their parents in the fair scheduler allocation file.

A queue’s name starts with the names of its parents, with periods as separators. So a queue named “queue1” under the root queue, would be referred to as “root.queue1”, and a queue named “queue2” under a queue named “parent1” would be referred to as “root.parent1.queue2”. When referring to queues, the root part of the name is optional, so queue1 could be referred to as just “queue1”, and a queue2 could be referred to as just “parent1.queue2”.

Additionally, the fair scheduler allows setting a different custom policy for each queue to allow sharing the queue’s resources in any which way the user wants. A custom policy can be built by extending org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy. FifoPolicy, FairSharePolicy (default), and DominantResourceFairnessPolicy are built-in and can be readily used.

Certain add-ons are not yet supported which existed in the original (MR1) Fair Scheduler. Among them, is the use of a custom policies governing priority “boosting” over certain apps.

Automatically placing applications in queues

The Fair Scheduler allows administrators to configure policies that automatically place submitted applications into appropriate queues. Placement can depend on the user and groups of the submitter and the requested queue passed by the application. A policy consists of a set of rules that are applied sequentially to classify an incoming application. Each rule either places the app into a queue, rejects it, or continues on to the next rule. Refer to the allocation file format below for how to configure these policies.

 

ResourceManger Restart

Overview

ResourceManager is the central authority that manages resources and schedules applications running atop of YARN. Hence, it is potentially a single point of failure in a Apache YARN cluster. ` This document gives an overview of ResourceManager Restart, a feature that enhances ResourceManager to keep functioning across restarts and also makes ResourceManager down-time invisible to end-users.

ResourceManager Restart feature is divided into two phases:

  • ResourceManager Restart Phase 1 (Non-work-preserving RM restart): Enhance RM to persist application/attempt state and other credentials information in a pluggable state-store. RM will reload this information from state-store upon restart and re-kick the previously running applications. Users are not required to re-submit the applications.
  • ResourceManager Restart Phase 2 (Work-preserving RM restart): Focus on re-constructing the running state of ResourceManager by combining the container statuses from NodeManagers and container requests from ApplicationMasters upon restart. The key difference from phase 1 is that previously running applications will not be killed after RM restarts, and so applications won’t lose its work because of RM outage.

Feature

  • Phase 1: Non-work-preserving RM restart

    As of Hadoop 2.4.0 release, only ResourceManager Restart Phase 1 is implemented which is described below.

    The overall concept is that RM will persist the application metadata (i.e. ApplicationSubmissionContext) in a pluggable state-store when client submits an application and also saves the final status of the application such as the completion state (failed, killed, finished) and diagnostics when the application completes. Besides, RM also saves the credentials like security keys, tokens to work in a secure environment. Any time RM shuts down, as long as the required information (i.e.application metadata and the alongside credentials if running in a secure environment) is available in the state-store, when RM restarts, it can pick up the application metadata from the state-store and re-submit the application. RM won’t re-submit the applications if they were already completed (i.e. failed, killed, finished) before RM went down.

    NodeManagers and clients during the down-time of RM will keep polling RM until RM comes up. When RM becomes alive, it will send a re-sync command to all the NodeManagers and ApplicationMasters it was talking to via heartbeats. As of Hadoop 2.4.0 release, the behaviors for NodeManagers and ApplicationMasters to handle this command are: NMs will kill all its managed containers and re-register with RM. From the RM’s perspective, these re-registered NodeManagers are similar to the newly joining NMs. AMs(e.g. MapReduce AM) are expected to shutdown when they receive the re-sync command. After RM restarts and loads all the application metadata, credentials from state-store and populates them into memory, it will create a new attempt (i.e. ApplicationMaster) for each application that was not yet completed and re-kick that application as usual. As described before, the previously running applications’ work is lost in this manner since they are essentially killed by RM via the re-sync command on restart.

  • Phase 2: Work-preserving RM restart

    As of Hadoop 2.6.0, we further enhanced RM restart feature to address the problem to not kill any applications running on YARN cluster if RM restarts.

    Beyond all the groundwork that has been done in Phase 1 to ensure the persistency of application state and reload that state on recovery, Phase 2 primarily focuses on re-constructing the entire running state of YARN cluster, the majority of which is the state of the central scheduler inside RM which keeps track of all containers’ life-cycle, applications’ headroom and resource requests, queues’ resource usage etc. In this way, RM doesn’t need to kill the AM and re-run the application from scratch as it is done in Phase 1. Applications can simply re-sync back with RM and resume from where it were left off.

    RM recovers its runing state by taking advantage of the container statuses sent from all NMs. NM will not kill the containers when it re-syncs with the restarted RM. It continues managing the containers and send the container statuses across to RM when it re-registers. RM reconstructs the container instances and the associated applications’ scheduling status by absorbing these containers’ information. In the meantime, AM needs to re-send the outstanding resource requests to RM because RM may lose the unfulfilled requests when it shuts down. Application writers using AMRMClient library to communicate with RM do not need to worry about the part of AM re-sending resource requests to RM on re-sync, as it is automatically taken care by the library itself.

 

ResourceManager High Availability

Introduction

This guide provides an overview of High Availability of YARN’s ResourceManager, and details how to configure and use this feature. The ResourceManager (RM) is responsible for tracking the resources in a cluster, and scheduling applications (e.g., MapReduce jobs). Prior to Hadoop 2.4, the ResourceManager is the single point of failure in a YARN cluster. The High Availability feature adds redundancy in the form of an Active/Standby ResourceManager pair to remove this otherwise single point of failure.

Architecture

Overview of ResourceManager High Availability

RM Failover

ResourceManager HA is realized through an Active/Standby architecture – at any point of time, one of the RMs is Active, and one or more RMs are in Standby mode waiting to take over should anything happen to the Active. The trigger to transition-to-active comes from either the admin (through CLI) or through the integrated failover-controller when automatic-failover is enabled.

Manual transitions and failover

When automatic failover is not enabled, admins have to manually transition one of the RMs to Active. To failover from one RM to the other, they are expected to first transition the Active-RM to Standby and transition a Standby-RM to Active. All this can be done using the “yarn rmadmin” CLI.

Automatic failover

The RMs have an option to embed the Zookeeper-based ActiveStandbyElector to decide which RM should be the Active. When the Active goes down or becomes unresponsive, another RM is automatically elected to be the Active which then takes over. Note that, there is no need to run a separate ZKFC daemon as is the case for HDFS because ActiveStandbyElector embedded in RMs acts as a failure detector and a leader elector instead of a separate ZKFC deamon.

Client, ApplicationMaster and NodeManager on RM failover

When there are multiple RMs, the configuration (yarn-site.xml) used by clients and nodes is expected to list all the RMs. Clients, ApplicationMasters (AMs) and NodeManagers (NMs) try connecting to the RMs in a round-robin fashion until they hit the Active RM. If the Active goes down, they resume the round-robin polling until they hit the “new” Active. This default retry logic is implemented as org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider. You can override the logic by implementingorg.apache.hadoop.yarn.client.RMFailoverProxyProvider and setting the value of yarn.client.failover-proxy-provider to the class name.

Recovering prevous active-RM’s state

With the ResourceManger Restart enabled, the RM being promoted to an active state loads the RM internal state and continues to operate from where the previous active left off as much as possible depending on the RM restart feature. A new attempt is spawned for each managed application previously submitted to the RM. Applications can checkpoint periodically to avoid losing any work. The state-store must be visible from the both of Active/Standby RMs. Currently, there are two RMStateStore implementations for persistence – FileSystemRMStateStore and ZKRMStateStore. The ZKRMStateStore implicitly allows write access to a single RM at any point in time, and hence is the recommended store to use in an HA cluster. When using the ZKRMStateStore, there is no need for a separate fencing mechanism to address a potential split-brain situation where multiple RMs can potentially assume the Active role. When using the ZKRMStateStore, it is advisable to NOT set the “zookeeper.DigestAuthenticationProvider.superDigest” property on the Zookeeper cluster to ensure that the zookeeper admin does not have access to YARN application/user credential information.

 

Docker Container Executor

Overview

Docker combines an easy-to-use interface to Linux containers with easy-to-construct image files for those containers. In short, Docker launches very light weight virtual machines.

The Docker Container Executor (DCE) allows the YARN NodeManager to launch YARN containers into Docker containers. Users can specify the Docker images they want for their YARN containers. These containers provide a custom software environment in which the user’s code runs, isolated from the software environment of the NodeManager. These containers can include special libraries needed by the application, and they can have different versions of Perl, Python, and even Java than what is installed on the NodeManager. Indeed, these containers can run a different flavor of Linux than what is running on the NodeManager – although the YARN container must define all the environments and libraries needed to run the job, nothing will be shared with the NodeManager.

Docker for YARN provides both consistency (all YARN containers will have the same software environment) and isolation (no interference with whatever is installed on the physical machine).

Cluster Configuration

Docker Container Executor runs in non-secure mode of HDFS and YARN. It will not run in secure mode, and will exit if it detects secure mode.

The DockerContainerExecutor requires Docker daemon to be running on the NodeManagers, and the Docker client installed and able to start Docker containers. To prevent timeouts while starting jobs, the Docker images to be used by a job should already be downloaded in the NodeManagers. Here’s an example of how this can be done:

sudo docker pull sequenceiq/hadoop-docker:2.4.1

This should be done as part of the NodeManager startup.

The following properties must be set in yarn-site.xml:

<property>
 <name>yarn.nodemanager.docker-container-executor.exec-name</name>
  <value>/usr/bin/docker</value>
  <description>
     Name or path to the Docker client. This is a required parameter. If this is empty,
     user must pass an image name as part of the job invocation(see below).
  </description>
</property>

<property>
  <name>yarn.nodemanager.container-executor.class</name>
  <value>org.apache.hadoop.yarn.server.nodemanager.DockerContainerExecutor</value>
  <description>
     This is the container executor setting that ensures that all
jobs are started with the DockerContainerExecutor.
  </description>
</property>

Administrators should be aware that DCE doesn’t currently provide user name-space isolation. This means, in particular, that software running as root in the YARN container will have root privileges in the underlying NodeManager. Put differently, DCE currently provides no better security guarantees than YARN’s Default Container Executor. In fact, DockerContainerExecutor will exit if it detects secure yarn.

Tips for connecting to a secure docker repository

By default, docker images are pulled from the docker public repository. The format of a docker image url is: username/image_name. For example, sequenceiq/hadoop-docker:2.4.1 is an image in docker public repository that contains java and hadoop.

If you want your own private repository, you provide the repository url instead of your username. Therefore, the image url becomes: private_repo_url/image_name. For example, if your repository is on localhost:8080, your images would be like: localhost:8080/hadoop-docker

To connect to a secure docker repository, you can use the following invocation:

    docker login [OPTIONS] [SERVER]

    Register or log in to a Docker registry server, if no server is specified
    "https://index.docker.io/v1/" is the default.

  -e, --email=""       Email
  -p, --password=""    Password
  -u, --username=""    Username

If you want to login to a self-hosted registry you can specify this by adding the server name.

docker login <private_repo_url>

This needs to be run as part of the NodeManager startup, or as a cron job if the login session expires periodically. You can login to multiple docker repositories from the same NodeManager, but all your users will have access to all your repositories, as at present the DockerContainerExecutor does not support per-job docker login.

Job Configuration

Currently you cannot configure any of the Docker settings with the job configuration. You can provide Mapper, Reducer, and ApplicationMaster environment overrides for the docker images, using the following 3 JVM properties respectively(only for MR jobs):

  • mapreduce.map.env: You can override the mapper’s image by passing yarn.nodemanager.docker-container-executor.image-name=your_image_name to this JVM property.
  • mapreduce.reduce.env: You can override the reducer’s image by passing yarn.nodemanager.docker-container-executor.image-name=your_image_name to this JVM property.
  • yarn.app.mapreduce.am.env: You can override the ApplicationMaster’s image by passing yarn.nodemanager.docker-container-executor.image-name=your_image_name to this JVM property.

Docker Image Requirements

The Docker Images used for YARN containers must meet the following requirements:

The distro and version of Linux in your Docker Image can be quite different from that of your NodeManager. (Docker does have a few limitations in this regard, but you’re not likely to hit them.) However, if you’re using the MapReduce framework, then your image will need to be configured for running Hadoop. Java must be installed in the container, and the following environment variables must be defined in the image: JAVA_HOME, HADOOP_COMMON_PATH, HADOOP_HDFS_HOME, HADOOP_MAPRED_HOME, HADOOP_YARN_HOME, and HADOOP_CONF_DIR

Working example of yarn launched docker containers

The following example shows how to run teragen using DockerContainerExecutor.

Step 1. First ensure that YARN is properly configured with DockerContainerExecutor(see above).

<property>
 <name>yarn.nodemanager.docker-container-executor.exec-name</name>
  <value>docker -H=tcp://0.0.0.0:4243</value>
  <description>
     Name or path to the Docker client. The tcp socket must be
     where docker daemon is listening.
  </description>
</property>

<property>
  <name>yarn.nodemanager.container-executor.class</name>
  <value>org.apache.hadoop.yarn.server.nodemanager.DockerContainerExecutor</value>
  <description>
     This is the container executor setting that ensures that all
jobs are started with the DockerContainerExecutor.
  </description>
</property>

Step 2. Pick a custom Docker image if you want. In this example, we’ll use sequenceiq/hadoop-docker:2.4.1 from the docker hub repository. It has jdk, hadoop, and all the previously mentioned environment variables configured.

Step 3. Run.

hadoop jar $HADOOP_PREFIX/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.2.jar \
  teragen \
     -Dmapreduce.map.env="yarn.nodemanager.docker-container-executor.image-name=sequenceiq/hadoop-docker:2.4.1" \
   -Dyarn.app.mapreduce.am.env="yarn.nodemanager.docker-container-executor.image-name=sequenceiq/hadoop-docker:2.4.1" \
  1000 \
  teragen_out_dir

Once it succeeds, you can check the yarn debug logs to verify that docker indeed has launched containers.

 

 

深入浅出Mesos

为软件定义数据中心而生的操作系统

Mesos是Apache下的开源分布式资源管理框架,它被称为是分布式系统的内核。Mesos最初是由加州大学伯克利分校的AMPLab开发的,后在Twitter得到广泛使用。InfoQ接下来将会策划系列文章来为读者剖析Mesos。本文是整个系列的第一篇,简单介绍了Mesos的背景、历史以及架构。

我讨厌“软件定义数据中心(SDDC)”这个词,并不是因为我质疑这个概念,而是我发现很多公司都对这个词有误用,他们甚至直接把这个词拿来套用,并急于把自己定位为下一代数据中心的创新者。具体来说,我认为,在商用x86硬件上运行软件(应用)并不是什么SDDC解决方案,它也不具备虚拟化硬件到资源池的能力。真正的SDDC底层基础架构应该可以从运行于其上的应用程序中抽象出来,并根据应用程序不断变化的需求,动态且自动地分配、重新分配应用程序,然后运行于数据中心的不同组件之中。

这就是为什么我一直兴奋地要在后面介绍Mesos,一个Apache开源项目。为什么我对Mesos如此兴奋?回想x86虚拟化之初对数据中心曾经的承诺:通过增加服务器利用率使其更高效,通过从物理基础架构抽象应用使其更敏捷。虽然收获颇丰,但是以虚拟机为单位,粒度仍不够精细,如果应用程序都过于庞大,那就难以充分实现这一承诺。如今,飞速发展的容器技术、分布式应用程序和微服务技术正悄然改变着我们对数据中心的运行和管理方式。

试想,可否整合数据中心中的所有资源,并将它们放在一个大的虚拟池里,代替单独的物理服务器;然后开放诸如CPU、内存和I/O这些基本资源而不是虚拟机?同样,可否把应用程序拆分成小的、隔离的任务单位,从而根据数据中心应用的需求,从虚拟数据中心池中动态分配任务资源?就像操作系统将PC的处理器和RAM放入资源池,使其可以为不同的进程协调分配和释放资源。进一步讲,我们可以把Mesos作为操作系统内核,然后将数据中心看为PC。这也是正是我想说的:Mesos正在改变数据中心,它让真正的SDDC成为现实。

接下来我先介绍下Mesos的历史。Mesos的起源于Google的数据中心资源管理系统Borg。你可以从WIRED杂志的这篇文章中了解更多关于Borg起源的信息及它对Mesos影响。Twitter从Google的Borg系统中得到启发,然后就开发一个类似的资源管理系统来帮助他们摆脱可怕的“失败之鲸”(译者注:见上图)。后来他们注意到加州大学伯克利分校AMPLab正在开发的名为Mesos的项目,这个项目的负责人是Ben Hindman,Ben是加州大学伯克利分校的博士研究生。后来Ben Hindman加入了Twitter,负责开发和部署Mesos。现在Mesos管理着Twitter超过30,0000台服务器上的应用部署,“失败之鲸”已成往事。其他公司纷至沓来,也部署了Mesos,比如Airbnb(空中食宿网)、eBay(电子港湾)和Netflix。

Mesos是如何让Twitter和Airbnb这样的公司,通过数据中心资源更高效的管理系统,扩展应用的呢?我们从一个相当简单但很优雅的两级调度架构开始说起。

上图修改自Apache Mesos网站上的图片,如图所示,Mesos实现了两级调度架构,它可以管理多种类型的应用程序。第一级调度是Master的守护进程,管理Mesos集群中所有节点上运行的Slave守护进程。集群由物理服务器或虚拟服务器组成,用于运行应用程序的任务,比如Hadoop和MPI作业。第二级调度由被称作Framework的“组件”组成。Framework包括调度器(Scheduler)和执行器(Executor)进程,其中每个节点上都会运行执行器。Mesos能和不同类型的Framework通信,每种Framework由相应的应用集群管理。上图中只展示了Hadoop和MPI两种类型,其它类型的应用程序也有相应的Framework。

Mesos Master协调全部的Slave,并确定每个节点的可用资源,
聚合计算跨节点的所有可用资源的报告,然后向注册到Master的Framework(作为Master的客户端)发出资源邀约。Framework可以根据应用程序的需求,选择接受或拒绝来自master的资源邀约。一旦接受邀约,Master即协调Framework和Slave,调度参与节点上任务,并在容器中执行,以使多种类型的任务,比如Hadoop和Cassandra,可以在同一个节点上同时运行。

我认为,Mesos使用的两级调度架构以及算法、隔离技术让在同一个节点上运行多种不同类型的应用成为了现实,这才是数据中心的未来。正如我之前所述,这是到目前为止我所见过的,履行SDDC承诺最好的现成技术。

Mesos的体系结构和工作流

本篇文章将深入剖析Mesos的技术细节和组件间的流程,以便大家更好地理解为什么Mesos是数据中心操作系统内核的重要候选者。文中所述的大部分技术细节都来自Ben Hindman团队2010年在加州大学伯克利分校时发表的白皮书。 顺便说一句,Hindman已经离开Twitter去了Mesosphere,着手建设并商业化以Mesos为核心的数据中心操作系统。在此,我将重点放在提炼白皮书的主要观点上,然后给出一些我对相关技术所产生的价值的思考。

Mesos流程

接着上一篇文章说。并结合前述的加州大学伯克利分校的白皮书以及Apache Mesos网站,开始我们的讲述:

我们来研究下上图的事件流程。上一篇谈到,Slave是运行在物理或虚拟服务器上的Mesos守护进程,是Mesos集群的一部分。Framework由调度器(Scheduler)应用程序和任务执行器(Executor)组成,被注册到Mesos以使用Mesos集群中的资源。

  • Slave 1向Master汇报其空闲资源:4个CPU、4GB内存。然后,Master触发分配策略模块,得到的反馈是Framework 1要请求全部可用资源。
  • Master向Framework 1发送资源邀约,描述了Slave 1上的可用资源。
  • Framework的调度器(Scheduler)响应Master,需要在Slave上运行两个任务,第一个任务分配<2 CPUs, 1 GB RAM>资源,第二个任务分配<1 CPUs, 2 GB RAM>资源。
  • 最后,Master向Slave下发任务,分配适当的资源给Framework的任务执行器(Executor),接下来由执行器启动这两个任务(如图中虚线框所示)。 此时,还有1个CPU和1GB的RAM尚未分配,因此分配模块可以将这些资源供给Framework 2。

资源分配

为了实现在同一组Slave节点集合上运行多任务这一目标,Mesos使用了隔离模块, 该模块使用了一些应用和进程隔离机制来运行这些任务。 不足为奇的是,虽然可以使用虚拟机隔离实现隔离模块,但是Mesos当前模块支持的是容器隔离。 Mesos早在2009年就用上了Linux的容器技术,如cgroups和Solaris Zone,时至今日这些仍然是默认的。 然而,Mesos社区增加了Docker作为运行任务的隔离机制。 不管使用哪种隔离模块,为运行特定应用程序的任务,都需要将执行器全部打包,并在已经为该任务分配资源的Slave服务器上启动。 当任务执行完毕后,容器会被“销毁”,资源会被释放,以便可以执行其他任务。

我们来更深入地研究一下资源邀约和分配策略,因为这对Mesos管理跨多个Framework和应用的资源,是不可或缺的。 我们前面提到资源邀约的概念,即由Master向注册其上的Framework发送资源邀约。 每次资源邀约包含一份Slave节点上可用的CPU、RAM等资源的列表。 Master提供这些资源给它的Framework,是基于分配策略的。分配策略对所有的Framework普遍适用,同时适用于特定的Framework。 Framework可以拒绝资源邀约,如果它不满足要求,若此,资源邀约随即可以发给其他Framework。 由Mesos管理的应用程序通常运行短周期的任务,因此这样可以快速释放资源,缓解Framework的资源饥饿; Slave定期向Master报告其可用资源,以便Master能够不断产生新的资源邀约。 另外,还可以使用诸如此类的技术, 每个Fraamework过滤不满足要求的资源邀约、Master主动废除给定周期内一直没有被接受的邀约。

分配策略有助于Mesos Master判断是否应该把当前可用资源提供给特定的Framework,以及应该提供多少资源。 关于Mesos中使用资源分配以及可插拔的分配模块,实现非常细粒度的资源共享,会单独写一篇文章。 言归正传,Mesos实现了公平共享和严格优先级(这两个概念我会在资源分配那篇讲)分配模块, 确保大部分用例的最佳资源共享。已经实现的新分配模块可以处理大部分之外的用例。

集大成者

现在来回答谈及Mesos时,“那又怎样”的问题。 对于我来说,令人兴奋的是Mesos集四大好处于一身(概述如下),正如我在前一篇文章中所述,我目测Mesos将为下一代数据中心的操作系统内核。

  • 效率 – 这是最显而易见的好处,也是Mesos社区和Mesosphere经常津津乐道的。

上图来自Mesosphere网站,描绘出Mesos为效率带来的好处。如今,在大多数数据中心中,服务器的静态分区是常态,即使使用最新的应用程序,如Hadoop。这时常令人担忧的是,当不同的应用程序使用相同的节点时,调度相互冲突,可用资源互相争抢。静态分区本质上是低效的,因为经常会面临,其中一个分区已经资源耗尽,而另一个分区的资源却没有得到充分利用,而且没有什么简单的方法能跨分区集群重新分配资源。使用Mesos资源管理器仲裁不同的调度器,我们将进入动态分区/弹性共享的模式,所有应用程序都可以使用节点的公共池,安全地、最大化地利用资源。 一个经常被引用的例子是Slave节点通常运行Hadoop作业,在Slave空闲阶段,动态分配给他们运行批处理作业,反之亦然。 值得一提的是,这其中的某些环节可以通过虚拟化技术,如VMware vSphere的分布式资源调度(DRS)来完成。 然而,Mesos具有更精细的粒度,因为Mesos在应用层而不是机器层分配资源,通过容器而不是整个虚拟机(VM)分配任务。 前者能够为每个应用程序的特殊需求做考量,应用程序的调度器知道最有效地利用资源; 后者能够更好地“装箱”,运行一个任务,没有必要实例化一整个虚拟机,其所需的进程和二进制文件足矣。

  • 敏捷 – 与效率和利用率密切相关,这实际上是我认为最重要的好处。 往往,效率解决的是“如何花最少的钱最大化数据中心的资源”,而敏捷解决的是“如何快速用上手头的资源。” 正如我和我的同事Tyler Britten经常指出,IT的存在是帮助企业赚钱和省钱的;那么如何通过技术帮助我们迅速创收,是我们要达到的重要指标。 这意味着要确保关键应用程序不能耗尽所需资源,因为我们无法为应用提供足够的基础设施,特别是在数据中心的其他地方都的资源是收费情况下。
  • 可扩展性 – 为可扩展而设计,这是我真心欣赏Mesos架构的地方。 这一重要属性使数据可以指数级增长、分布式应用可以水平扩展。 我们的发展已经远远超出了使用巨大的整体调度器或者限定群集节点数量为64的时代,足矣承载新形式的应用扩张。

    Mesos可扩展设计的关键之处是采用两级调度架构。 使用Framework代理任务的实际调度,Master可以用非常轻量级的代码实现,更易于扩展集群发展的规模。 因为Master不必知道所支持的每种类型的应用程序背后复杂的调度逻辑。 此外,由于Master不必为每个任务做调度,因此不会成为容量的性能瓶颈,而这在为每个任务或者虚拟机做调度的整体调度器中经常发生。

  • 模块化 – 对我来说,预测任何开源技术的健康发展,很大程度上取决于围绕该项目的生态系统。 我认为Mesos项目前景很好,因为其设计具有包容性,可以将功能插件化,比如分配策略、隔离机制和Framework。将容器技术,比如Docker和Rocket插件化的好处是显而易见。但是我想在此强调的是围绕Framework建设的生态系统。将任务调度委托给Framework应用程序,以及采用插件架构,通过Mesos这样的设计,社区创造了能够让Mesos问鼎数据中心资源管理的生态系统。因为每接入一种新的Framework,Master无需为此编码,Slave模块可以复用,使得在Mesos所支持的宽泛领域中,业务迅速增长。相反,开发者可以专注于他们的应用和Framework的选择。 当前而且还在不断地增长着的Mesos Framework列表参见此处以及下图:

 

持久化存储和容错

持久化存储的问题

正如我在前面中讨论过的,使用Mesos的主要好处是可以在同一组计算节点集合上运行多种类型的应用程序(调度以及通过Framework初始化任务)。这些任务使用隔离模块(目前是某些类型的容器技术)从实际节点中抽象出来,以便它们可以根据需要在不同的节点上移动和重新启动。

由此我们会思考一个问题,Mesos是如何处理持久化存储的呢?如果我在运行一个数据库作业,Mesos如何确保当任务被调度时,分配的节点可以访问其所需的数据?如图所示,在Hindman的示例中,使用Hadoop文件系统(HDFS)作为Mesos的持久层,这是HDFS常见的使用方式,也是Mesos的执行器传递分配指定任务的配置数据给Slave经常使用的方式。实际上,Mesos的持久化存储可以使用多种类型的文件系统,HDFS只是其中之一,但也是Mesos最经常使用的,它使得Mesos具备了与高性能计算的亲缘关系。其实Mesos可以有多种选择来处理持久化存储的问题:

  • 分布式文件系统。如上所述,Mesos可以使用DFS(比如HDFS或者Lustre)来保证数据可以被Mesos集群中的每个节点访问。这种方式的缺点是会有网络延迟,对于某些应用程序来说,这样的网络文件系统或许并不适合。
  • 使用数据存储复制的本地文件系统。另一种方法是利用应用程序级别的复制来确保数据可被多个节点访问。提供数据存储复制的应用程序可以是NoSQL数据库,比如Cassandra和MongoDB。这种方式的优点是不再需要考虑网络延迟问题。缺点是必须配置Mesos,使特定的任务只运行在持有复制数据的节点上,因为你不会希望数据中心的所有节点都复制相同的数据。为此,可以使用一个Framework,静态地为其预留特定的节点作为复制数据的存储。

  • 不使用复制的本地文件系统。也可以将持久化数据存储在指定节点的文件系统上,并且将该节点预留给指定的应用程序。和前面的选择一样,可以静态地为指定应用程序预留节点,但此时只能预留给单个节点而不是节点集合。后面两种显然不是理想的选择,因为实质上都需要创建静态分区。然而,在不允许延时或者应用程序不能复制它的数据存储等特殊情况下,我们需要这样的选择。

Mesos项目还在发展中,它会定期增加新功能。现在我已经发现了两个可以帮助解决持久化存储问题的新特性:

  • 动态预留。Framework可以使用这个功能框架保留指定的资源,比如持久化存储,以便在需要启动另一个任务时,资源邀约只会发送给那个Framework。这可以在单节点和节点集合中结合使用Framework配置,访问永久化数据存储。关于这个建议的功能的更多信息可以从此处获得。
  • 持久化卷。该功能可以创建一个卷,作为Slave节点上任务的一部分被启动,即使在任务完成后其持久化依然存在。Mesos为需要访问相同的数据后续任务,提供在可以访问该持久化卷的节点集合上相同的Framework来初始化。关于这个建议的功能的更多信息可以从此处获得。

容错

接下来,我们来谈谈Mesos在其协议栈上是如何提供容错能力的。恕我直言,Mesos的优势之一便是将容错设计到架构之中,并以可扩展的分布式系统的方式来实现。

  • Master。故障处理机制和特定的架构设计实现了Master的容错。

    首先,Mesos决定使用热备份(hot-standby)设计来实现Master节点集合。正如Tomas Barton对上图的说明,一个Master节点与多个备用(standby)节点运行在同一集群中,并由开源软件Zookeeper来监控。Zookeeper会监控Master集群中所有的节点,并在Master节点发生故障时管理新Master的选举。建议的节点总数是5个,实际上,生产环境至少需要3个Master节点。 Mesos决定将Master设计为持有软件状态,这意味着当Master节点发生故障时,其状态可以很快地在新选举的Master节点上重建。 Mesos的状态信息实际上驻留在Framework调度器和Slave节点集合之中。当一个新的Master当选后,Zookeeper会通知Framework和选举后的Slave节点集合,以便使其在新的Master上注册。彼时,新的 Master可以根据Framework和Slave节点集合发送过来的信息,重建内部状态。

  • Framework调度器。Framework调度器的容错是通过Framework将调度器注册2份或者更多份到Master来实现。当一个调度器发生故障时,Master会通知另一个调度来接管。需要注意的是Framework自身负责实现调度器之间共享状态的机制。
  • Slave。Mesos实现了Slave的恢复功能,当Slave节点上的进程失败时,可以让执行器/任务继续运行,并为那个Slave进程重新连接那台Slave节点上运行的执行器/任务。当任务执行时,Slave会将任务的监测点元数据存入本地磁盘。如果Slave进程失败,任务会继续运行,当Master重新启动Slave进程后,因为此时没有可以响应的消息,所以重新启动的Slave进程会使用检查点数据来恢复状态,并重新与执行器/任务连接。

如下情况则截然不同,计算节点上Slave正常运行而任务执行失败。在此,Master负责监控所有Slave节点的状态。

当计算节点/Slave节点无法响应多个连续的消息后,Master会从可用资源的列表中删除该节点,并会尝试关闭该节点。

然后,Master会向分配任务的Framework调度器汇报执行器/任务失败,并允许调度器根据其配置策略做任务失败处理。通常情况下,Framework会重新启动任务到新的Slave节点,假设它接收并接受来自Master的相应的资源邀约。

  • 执行器/任务。与计算节点/Slave节点故障类似,Master会向分配任务的Framework调度器汇报执行器/任务失败,并允许调度器根据其配置策略在任务失败时做出相应的处理。通常情况下,Framework在接收并接受来自Master的相应的资源邀约后,会在新的Slave节点上重新启动任务。

Mesos的资源分配

Apache Mesos能够成为最优秀的数据中心资源管理器的一个重要功能是面对各种类型的应用,它具备像交警一样的疏导能力。本文将深入Mesos的资源分配内部,探讨Mesos是如何根据客户应用需求,平衡公平资源共享的。我们将探讨Mesos的资源分配模块,看看它是如何确定将什么样的资源邀约发送给具体哪个Framework,以及在必要时如何回收资源。让我们先来回顾一下Mesos的任务调度过程:

从前面提到的两级架构的说明中我们知道,Mesos Master代理任务的调度首先从Slave节点收集有关可用资源的信息,然后以资源邀约的形式,将这些资源提供给注册其上的Framework。Framework可以根据是否符合任务对资源的约束,选择接受或拒绝资源邀约。一旦资源邀约被接受,Framework将与Master协作调度任务,并在数据中心的相应Slave节点上运行任务。

如何作出资源邀约的决定是由资源分配模块实现的,该模块存在于Master之中。资源分配模块确定Framework接受资源邀约的顺序,与此同时,确保在本性贪婪的Framework之间公平地共享资源。在同质环境中,比如Hadoop集群,使用最多的公平份额分配算法之一是最大最小公平算法(max-min fairness)。最大最小公平算法算法将最小的资源分配最大化,并将其提供给用户,确保每个用户都能获得公平的资源份额,以满足其需求所需的资源;一个简单的例子能够说明其工作原理,请参考最大最小公平份额算法页面的示例1。如前所述,在同质环境下,这通常能够很好地运行。同质环境下的资源需求几乎没有波动,所涉及的资源类型包括CPU、内存、网络带宽和I/O。然而,在跨数据中心调度资源并且是异构的资源需求时,资源分配将会更加困难。例如,当用户A的每个任务需要1核CPU、4GB内存,而用户B的每个任务需要3核CPU、1GB内存时,如何提供合适的公平份额分配策略?当用户A的任务是内存密集型,而用户B的任务是CPU密集型时,如何公平地为其分配一揽子资源?

因为Mesos是专门管理异构环境中的资源,所以它实现了一个可插拔的资源分配模块架构,将特定部署最适合的分配策略和算法交给用户去实现。例如,用户可以实现加权的最大最小公平性算法,让指定的Framework相对于其它的Framework获得更多的资源。默认情况下,Mesos包括一个严格优先级的资源分配模块和一个改良的公平份额资源分配模块。严格优先级模块实现的算法给定Framework的优先级,使其总是接收并接受足以满足其任务要求的资源邀约。这保证了关键应用在Mesos中限制动态资源份额上的开销,但是会潜在其他Framework饥饿的情况。

由于这些原因,大多数用户默认使用DRF(主导资源公平算法 Dominant Resource Fairness),这是Mesos中更适合异质环境的改良公平份额算法。DRF和Mesos一样出自Berkeley AMPLab团队,并且作为Mesos的默认资源分配策略实现编码。读者可以从此处此处阅读DRF的原始论文。在本文中,我将总结其中要点并提供一些例子,相信这样会更清晰地解读DRF。让我们开始揭秘之旅。

DRF的目标是确保每一个用户,即Mesos中的Framework,在异质环境中能够接收到其最需资源的公平份额。为了掌握DRF,我们需要了解主导资源(dominant resource)和主导份额(dominant share)的概念。Framework的主导资源是其最需的资源类型(CPU、内存等),在资源邀约中以可用资源百分比的形式展示。例如,对于计算密集型的任务,它的Framework的主导资源是CPU,而依赖于在内存中计算的任务,它的Framework的主导资源是内存。因为资源是分配给Framework的,所以DRF会跟踪每个Framework拥有的资源类型的份额百分比;Framework拥有的全部资源类型份额中占最高百分比的就是Framework的主导份额。DRF算法会使用所有已注册的Framework来计算主导份额,以确保每个Framework能接收到其主导资源的公平份额。

概念过于抽象了吧?让我们用一个例子来说明。假设我们有一个资源邀约,包含9核CPU和18GB的内存。Framework 1运行任务需要(1核CPU、4GB内存),Framework 2运行任务需要(3核CPU、1GB内存)
Framework 1的每个任务会消耗CPU总数的1/9、内存总数的2/9,因此Framework 1的主导资源是内存。同样,Framework 2的每个任务会CPU总数的1/3、内存总数的1/18,因此Framework 2的主导资源是CPU。DRF会尝试为每个Framework提供等量的主导资源,作为他们的主导份额。在这个例子中,DRF将协同Framework做如下分配:Framework 1有三个任务,总分配为(3核CPU、12GB内存),Framework 2有两个任务,总分配为(6核CPU、2GB内存)。

此时,每个Framework的主导资源(Framework 1的内存和Framework 2的CPU)最终得到相同的主导份额(2/3或67%),这样提供给两个Framework后,将没有足够的可用资源运行其他任务。需要注意的是,如果Framework 1中仅有两个任务需要被运行,那么Framework 2以及其他已注册的Framework将收到的所有剩余的资源。

那么,DRF是怎样计算而产生上述结果的呢?如前所述,DRF分配模块跟踪分配给每个Framework的资源和每个框架的主导份额。每次,DRF以所有Framework中运行的任务中最低的主导份额作为资源邀约发送给Framework。如果有足够的可用资源来运行它的任务,Framework将接受这个邀约。通过前面引述的DRF论文中的示例,我们来贯穿DRF算法的每个步骤。为了简单起见,示例将不考虑短任务完成后,资源被释放回资源池中这一因素,我们假设每个Framework会有无限数量的任务要运行,并认为每个资源邀约都会被接受。

回顾上述示例,假设有一个资源邀约包含9核CPU和18GB内存。Framework 1运行的任务需要(1核CPU、4GB内存),Framework 2运行的任务需要(3核CPU、2GB内存)。Framework 1的任务会消耗CPU总数的1/9、内存总数的2/9,Framework 1的主导资源是内存。同样,Framework 2的每个任务会CPU总数的1/3、内存总数的1/18,Framework 2的主导资源是CPU。

上面表中的每一行提供了以下信息:

  • Framework chosen——收到最新资源邀约的Framework。
  • Resource Shares——给定时间内Framework接受的资源总数,包括CPU和内存,以占资源总量的比例表示。
  • Dominant Share(主导份额)——给定时间内Framework主导资源占总份额的比例,以占资源总量的比例表示。
  • Dominant Share %(主导份额百分比)——给定时间内Framework主导资源占总份额的百分比,以占资源总量的百分比表示。
  • CPU Total Allocation——给定时间内接受的所有Framework的总CPU资源。
  • RAM Total Allocation——给定时间内接受的所有Framework的总内存资源。

注意,每个行中的最低主导份额以粗体字显示,以便查找。

最初,两个Framework的主导份额是0%,我们假设DRF首先选择的是Framework 2,当然我们也可以假设Framework 1,但是最终的结果是一样的。

  1. Framework 2接收份额并运行任务,使其主导资源成为CPU,主导份额增加至33%。
  2. 由于Framework 1的主导份额维持在0%,它接收共享并运行任务,主导份额的主导资源(内存)增加至22%。
  3. 由于Framework 1仍具有较低的主导份额,它接收下一个共享并运行任务,增加其主导份额至44%。
  4. 然后DRF将资源邀约发送给Framework 2,因为它现在拥有更低的主导份额。
  5. 该过程继续进行,直到由于缺乏可用资源,不能运行新的任务。在这种情况下,CPU资源已经饱和。
  6. 然后该过程将使用一组新的资源邀约重复进行。

需要注意的是,可以创建一个资源分配模块,使用加权的DRF使其偏向某个Framework或某组Framework。如前面所提到的,也可以创建一些自定义模块来提供组织特定的分配策略。

一般情况下,现在大多数的任务是短暂的,Mesos能够等待任务完成并重新分配资源。然而,集群上也可以跑长时间运行的任务,这些任务用于处理挂起作业或行为不当的Framework。

值得注意的是,在当资源释放的速度不够快的情况下,资源分配模块具有撤销任务的能力。Mesos尝试如此撤销任务:向执行器发送请求结束指定的任务,并给出一个宽限期让执行器清理该任务。如果执行器不响应请求,分配模块就结束该执行器及其上的所有任务。

分配策略可以实现为,通过提供与Framework相关的保证配置,来阻止对指定任务的撤销。如果Framework低于保证配置,Mesos将不能结束该Framework的任务。

我们还需了解更多关于Mesos资源分配的知识,但是我将戛然而止。接下来,我要说点不同的东西,是关于Mesos社区的。我相信这是一个值得考虑的重要话题,因为开源不仅包括技术,还包括社区。

说完社区,我将会写一些关于Mesos的安装和Framework的创建和使用的,逐步指导的教程。在一番实操教学的文章之后,我会回来做一些更深入的话题,比如Framework与Master是如何互动的,Mesos如何跨多个数据中心工作等。

Apache Marathon

 

转自InfoQ

http://www.infoq.com/cn/minibooks/analyse-mesos?utm_source=infoq&utm_medium=related_content_link&utm_campaign=relatedContent_articles_clk