current position:Home>DAG, the core technology of maxcompute execution engine

DAG, the core technology of maxcompute execution engine

2021-08-31 16:05:05 Heart of machine

As a rare in the industry EB Level data distributed platform ,MaxCompute The system supports... Every day Ten million Running a distributed job . In the number of jobs of this order , There is no doubt that the platform needs to support a variety of operation characteristics : There is already " Ali volume " Super large jobs with hundreds of thousands of computing nodes are unique in the big data ecosystem , There are also small and medium-sized distributed jobs . At the same time, different users have different sizes / Characteristic operation , At run time , Resource efficiency , Data throughput , Also have different expectations .


Fig.1 MaxCompute Online data analysis

Based on different sizes of jobs , At present MaxCompute The platform provides two different operation modes , The following table summarizes and compares the two modes :


Fig.2 offline (batch) Pattern vs Integrated scheduling, quasi real time (smode) Pattern

You can see from the above picture that , Offline job and integrated scheduling quasi real-time job , In scheduling mode , The data transfer , Use resources, sources, etc , There are very significant differences . so to speak , These two operation modes respectively represent the application of resources on demand in massive data scenarios Optimize throughput and resource utilization , And in dealing with medium ( A few ) Data is pre pulled up by calculating the full amount of nodes ( And direct data transmission ) Reduce Execution delay The two extremes of . And these differences , Finally, it will be reflected in terms of execution time and job resource utilization . Obviously , With high Throughput Offline mode for the main optimization objective , And to pursue low Latency Quasi real time mode of , The performance indicators in all aspects will be very different . For example 1TB-TPCH standard benchmark For example , This report was executed on ( performance ) And resource consumption . You can see , Quasi real time (SMODE) It has obvious advantages in performance (2.3X), however Such performance improvement is not without cost . stay TPCH On this particular scene , Integrated execution SMODE Pattern , In the acquisition of 2.3X While improving performance , Also consumed 3.2X System resources for (cpu * time).


Fig.3 performance / Resource consumption is relatively : offline (batch) Pattern vs Integrated scheduling, quasi real time (smode) Pattern

This observation is not surprising , Or to some extent by design Of . Take the following figure for a typical SQL Produced DAG Look at , All compute nodes are pulled up at the beginning of job submission , Although such a scheduling method allows data to be processed ( When needed )pipeline get up , Thus, it is possible to speed up the processing of data . However, not all upstream and downstream computing nodes in all execution plans can have idealized pipelined dataflow. In fact, for many assignments , except DAG The root node ( Image below M node ) outside , The downstream computing nodes are wasteful to some extent .


Fig.4 Integrated scheduling, quasi real time (smode) In mode , Possible inefficient use of resources

The inefficient use of resources caused by this idling , There are problems in the data processing flow barrier Operator and unable pipeline, And in DAG The picture is deep Especially in the case of . Of course, for scenarios where you want to optimize job runtime to the extreme , Through more resource consumption , To get the ultimate performance optimization , It is reasonable in some scenarios .  in fact , In some business-critical In the online service system , To ensure that the service always responds quickly and processes peak data , Average single digit CPU Utilization is not uncommon . But for a distributed system of the order of computing platform , whether Between extreme performance and efficient resource utilization , Get a better balance

The answer is yes . This is the hybrid computing model we are going to introduce here :Bubble Execution

1. Bubble Execution summary

DAG The core architecture idea of the framework , It lies in the clear hierarchical design of the logical layer and physical layer of the execution plan . The physical execution diagram is implemented by comparing the nodes in the logical diagram 、 Physical properties of edges, etc ( Such as data transmission medium , Scheduling time , Resource characteristics, etc ) Materialization of . Compared with Fig.2 Described in the batch Patterns and smode Pattern ,DAG It provides a flexible scheduling execution framework , Implementation of unified offline mode and quasi real-time integrated execution mode . As shown in the figure below , By adjusting the different physical properties of calculation nodes and data connection edges , It can not only clearly express the two existing calculation modes , After a more general extension , You can also explore a new hybrid operation mode , That is to say Bubble Execution.


Fig.5 DAG Multiple computing modes on the framework

Intuitively understand , If we put one Bubble As a large dispatching unit ,Bubble Internal resources apply to run together , And the data of internal upstream and downstream nodes are transmitted through the network / Memory direct transfer . In contrast ,Bubbles Data transmission on the connection side between , Then it is transmitted by dropping the disk . Then offline and quasi real-time job execution , Actually Think of it as Bubble Two extreme scenarios performed : Offline mode can be considered as every stage All alone as single-bubble The special case of , The quasi real-time framework is to plan all computing nodes of the job to a large Bubble Inside , To do the other extreme of integrated scheduling execution .DAG AM The two computing modes have been unified into a set of scheduling execution infra above . It makes it possible to complement the advantages of the two modes , For introduction Bubble Execution Laid the foundation .

Bubble Execution Through flexible adaptive subgraphs (Bubble) cutting , Between the two existing extremes , Provides a way to select finer grained , More general scheduling execution method , Achieve the optimal balance between job performance and resource utilization tradeoff Methods . According to the amount of input data 、 Operator properties 、 After analyzing the operation scale and other information ,DAG Of Bubble The execution mode can split an offline job into multiple jobs Bubbles, stay Bubble Make full use of the internal network / Memory direct connection and computing node preheating can improve performance . In this way , One DAG Computing nodes in the running diagram , Can be cut into some Bubble, Depending on where DAG The position in is cut into different Bubbles, It can also be cut into nothing at all Bubble( Still run in traditional offline job mode ). This highly flexible hybrid operation mode , Make the operation of the whole job more flexible and adaptive to the characteristics of a variety of online jobs , It is of great significance in practical production :

  • Bubble Mode makes it possible to accelerate more jobs : The integrated scheduling of quasi real-time jobs is based on the overall scale ( Online default 2000) Of " One size fits all " Formal access conditions . On the one hand, this is due to the equitable use of limited resources , On the other hand, it is also for controlling nodes failure It brings cost. But for medium and large-scale operations , Although the overall size may exceed the access threshold , But different subgraphs inside it , It may be the right size , And through data pipeline And other methods to accelerate . In addition, some computing nodes on the line due to their own characteristics ( For example, include UDF Wait for user logic to need a security sandbox ), Cannot execute with a preheated quasi real time resource pool , The current pattern of either black or white , Will make an assignment , Just include one such compute node , The entire job cannot be executed in accelerated mode .Bubble The model can better solve these problems .
  • Bubble Mode will enable Connect the two online resource pools : Current offline resources (cold) And quasi real-time resource pool (warm) As two online resources with different characteristics , Complete isolation , Their management . The status quo of this separation , May lead to a waste of resources . For example, for large-scale operations , Because the quasi real-time resource pool is completely unavailable , Queue for offline resources , At the same time, the quasi real-time resource pool may be idle , vice versa .Bubble The mode can be used by mixing different resources within the job , Make the two complement each other , Peak shaving and valley filling .
  • Bubble The model can improve the utilization of resources as a whole : From the perspective of resource utilization , For medium-sized jobs that can meet the admission of quasi real-time mode , Due to the quasi real-time mode, the operation mode pulled up by integrated scheduling , Although the running speed can be improved , But objectively, it will cause idling and waste of resources to a certain extent ( In especial DAG The diagram is deep and the calculation logic is barrier when ). In this case , According to the number of nodes , Calculation barrier Equal conditions , Disassemble the integration mode into multiple Bubble. This can effectively reduce the idle consumption of nodes , And when the splitting conditions are reasonable , The performance loss can also be low .
  • Bubble Mode can effectively reduce the cost of a single computing node failure The cost : Integrated quasi real-time mode execution , Because of its data pipeline Characteristics of , The fault tolerance granularity of a job is closely linked to its scheduling granularity : All are all-in-one. in other words , As long as one node fails to run , The entire job has to be rerun . Because the larger the scale of the operation , The greater the probability of node failure during operation , In this way failover Granularity undoubtedly limits the maximum job size it can support . and Bubble Pattern provides a better balance : Failure of a single compute node , At most, it only affects the same Bubble The node of . Besides Bubble Patterns for various failover Various fine-grained treatments have been done , We will describe below .

We can pass the standard TPCH-1TB test benchmark To visually evaluate Bubble Effect of execution mode . In the upper computing engine (MaxCompute Optimizer and runtime etc. ) remain unchanged , also Bubble The size of is maintained at 500( Specifically Bubble The segmentation rules are described below ) when , Do the Bubble Execution mode and standard offline mode , And quasi real time mode , In performance (Latency) And resource consumption (cpu * time) Comparison of two aspects :


Fig.6.a performance (Latency) Compare :Bubble Pattern vs offline (batch) Pattern vs Integrated scheduling, quasi real time (smode) Pattern

In terms of running time ,Bubble Mode is obviously much better than offline mode ( whole 2X Performance improvement of ), For the more quasi real-time integrated scheduling mode ,Bubble The execution performance of has not decreased significantly . Of course, some data can be used very effectively pipeline To deal with the query( such as Q5, Q8 etc. ), Quasi real-time operation still has certain advantages . but SMODE The advantage of job execution time is not without cost , If we also consider resource consumption , In the following illustration , We can see , The performance improvement of quasi real-time jobs is based on the fact that the resource consumption is far greater than Bubble On the premise of pattern . and Bubble While the performance is much better than offline mode , Its resource consumption , On the whole, it is similar .


Fig.6.b resource consumption (cpu * time) Compare :

Bubble Pattern vs offline (batch) Pattern vs Integrated scheduling, quasi real time (smode) Pattern

Put it all together ,Bubble Execution Can be a good combination of batch Advantages of mode and quasi real-time mode :

  • At the level of execution time , about TPCH Arbitrary in the test set query,bubble Patterns take longer to execute than batch The pattern should be short , On the whole 22 individual Queries The total time is reduced by nearly 2X, near service mode The time-consuming of the pattern ;
  • At the level of resource consumption ,bubble The pattern is basically the same as batch The pattern is quite , Compared with service mode The model has been greatly reduced , Overall reduction 2.6X.


Fig.6.c Bubble Mode and offline / Overall comparison of quasi real-time mode

It's worth noting that , Above TPCH Benchmark In comparison , We put Bubble The segmentation condition is simplified , That is, the overall limit bubble The size of is 500, Without fully considering barrier Equal conditions , If you're cutting bubble Further tuning is needed , For example, it can be effective for data pipeline Up node , Try to cut in bubble Inside , The job execution performance and resource utilization can be further improved , This is what we will pay attention to when the actual production system goes online . See... For specific online effects Section 3.

In understanding Bubble The overall design idea and architecture of execution mode , Next, let's talk about the specific Bubble Implementation details of the pattern , And the specific work required to push this new hybrid execution mode online .

2. Bubble Segmentation and execution of

use Bubble Execution The homework ( hereinafter referred to as Bubble Homework ) Like traditional offline jobs , Will pass a DAG master(aka. Application Master) To manage the whole DAG Life cycle of .AM Responsible for DAG Make reasonable bubble segmentation , And the corresponding resource application and scheduling operation . As a whole ,Bubble Internal computing node , According to the principle of calculating acceleration , Including the simultaneous use of pre pulled computing nodes and data transmission through memory / Network direct transmission pipeline Speed up . Not in bubble The internal computing nodes are executed through the classic offline mode , be not in bubble Internal connecting edge ( Including across bubble boundary The edge of ) The data on the , All are transmitted by disk dropping .


Fig.7 blend Bubble Execution mode

Bubble segmentation Method , Determines the execution time and resource utilization of the job . According to the concurrent scale of the computing node , Information such as operator attributes inside nodes shall be considered comprehensively . And cutting out bubble after ,Bubble Implementation It involves the execution of nodes , And data pipeline/barrier Of shuffle How to achieve organic combination , Here are separate descriptions .

2.1 Bubble Segmentation principle

Bubble Execution The core idea of is to split an offline job into multiple Bubble To execute . In order to cut out the bubble, There are several factors to consider :

  • Calculate the internal operator properties of nodes : For pulling up at the same time bubble For the scheduling mode of all computing nodes , The data is in bubble Can the internal upstream and downstream nodes communicate effectively pipeline Handle , Largely determined in bubble Inside , Whether the downstream node will waste resources due to idling . So in segmentation bubble In the logic of , When a node contains barrier Characteristic operators that may block data pipeline when , It will be considered not to cut into the same node with its downstream bubble.
  • Single Bubble The number of internal computing nodes : As discussed earlier , Integrated resource application / function , When too many compute nodes are included , You may not be able to request resources , Or even if you can apply for it failure The cost may also be uncontrollable . limit Bubble Size , It can avoid the negative effects caused by excessive integrated operation .
  • Aggregate computing nodes , cutting Bubble The direction of iteration : in consideration of bubble Size limit , Cut from top to bottom bubble And cut from bottom to top bubble Two ways , May lead to different segmentation results . For most online assignments , The data processed is often inverted triangle , Corresponding DAG Most of them are inverted triangle , Therefore, the bottom-up algorithm is adopted by default bubble, That is, from the distance root vertex The farthest node starts the iteration .

Among the above factors , Operator's barrier Properties are determined by the upper computing engine (e.g., MaxCompute Of optimizer) give . generally speaking , rely on global sort Operator of operation ( such as MergeJoin, SorteAggregate etc. ), It is considered to cause data blocking (barrier), And based on hash The operator of characteristic operation is for pipeline More friendly . For a single Bubble Number of internal allowed compute nodes , According to our analysis and analysis of the characteristics of online quasi real-time operation Bubble The actual grayscale experiment of the job , The selected default upper limit is 500. This is a reasonable value in most scenarios , It can ensure that the full amount of resources can be obtained quickly , At the same time, due to the amount of data processed and DoP Basically a positive correlation , On this scale bubble Generally, there is no memory overrun problem . Of course, these parameters and configurations , Both allow fine tuning of the job level through configuration , meanwhile Bubble The execution framework will also provide the ability to dynamically adjust in real time during job operation .

stay DAG In the system , One of the physical properties of edge connection , Is the upstream and downstream nodes connected by edges , Whether there is a pre and post dependency on the operation . For traditional offline mode , Upstream and downstream successively , The corresponding is sequential Properties of , We call it sequential edge. And for bubble Internal upstream and downstream nodes , It is scheduled and run at the same time , We call the edge connecting such upstream and downstream nodes , by concurrent edge. It can be noted that , such concurrent/sequential Physical properties of , stay bubble On application scenarios , Actual and data transmission mode ( The Internet / Memory direct transfer vs Trading data ) The physical properties of are coincident (Note: But the two are still separate physical properties , For example, when necessary concurrent edge Data can also be transmitted by data dropping on the ).

Based on this hierarchical abstraction ,Bubble Segmentation algorithm , Essentially, it's an attempt to aggregate DAG Nodes of a graph , Will not be satisfied bubble Of access conditions concurrent edge restore sequential edge The process of . Final , from concurrent edge The subgraph of Unicom is bubble. Here we use a practical example to show Bubble How the segmentation algorithm works . Suppose that there is... As shown in the figure below DAG chart , The circle in the figure represents the calculated vertex (vertex), The number in each circle indicates the vertex Corresponding actual calculation node concurrency . among V1 and V3 Because at the beginning of job submission , Because it contains barrier operator , And marked as barrier vertex. The connecting lines between circles represent the connecting edges upstream and downstream (edge). The orange line represents ( initial )concurrent edge, The black line stands for sequential edge, In the initial state diagram sequential edge according to barrier vertex The output edges of are sequential edge Principles determine , Other edges are initialized to... By default concurrent edge.


Fig.8 Example DAG chart ( The initial state )

At this beginning DAG On the basis of , According to the overall principles described above , And some implementation details described at the end of this chapter , The initial state described above , It can go through multiple rounds of algorithm iteration , Finally, the following Bubble Segmentation result . In this result, two Bubbles: Bubble#0 [V2, V4, V7, V8],Bubble#1 [V6, V10], Other nodes are judged to be running in offline mode .


Fig.9 Example DAG chart Bubble Segmentation result

In the segmentation process in the figure above , Bottom up traversal vertex, And adhering to the following principles :

If at present vertex Can't join bubble, Enter it into edge Are reduced to sequential edge( such as DAG In the picture V9);

If at present vertex Be able to join bubble, Perform breadth first traversal algorithm aggregation to generate bubble, First retrieve the input edge Connected vertex, Then retrieve the output edge Connected , For those that cannot be connected vertex, take edge Revert to sequential edge( such as DAG The graph traverses V2 Output vertex V5 Because total task count exceed 500 Trigger edge Restore ).

And for any one vertex, Only when the following conditions are met can it be added to bubble in :

  • vertex And the current bubble There is no such thing as sequential edge Connect ;
  • vertex And the current bubble There is no circular dependency , namely :
  • Case#1: The vertex All of the The downstream vertex A... Does not exist in vertex It is the present. bubble Of The upstream ;
  • Case#2: The vertex All of the The upstream vertex A... Does not exist in vertex It is the present. bubble Of The downstream ;
  • Case#3: The vertex All of the The downstream bubble A... Does not exist in vertex It is the present. bubble Of The upstream ;
  • Case#4: The vertex All of the The upstream bubble A... Does not exist in vertex It is the present. bubble Of The downstream ;

notes : there The upstream / The downstream Not just for the present vertex Immediate successor / Forerunner , It also includes indirect successors / Forerunner


Fig.10 segmentation Bubble There are several scenarios in which the process may have cyclic dependencies

And the actual online bubble The segmentation also takes into account information such as actual resources and expected running time , For example, computing nodes plan memory Whether it exceeds a certain value , Calculate whether the node contains UDF operator , Calculation nodes in production jobs are based on historical information (HBO) Whether the estimated execution time of is too long , wait , No more details here .

2.2 Bubble Scheduling and execution of

2.2.1 Bubble Dispatch

In order to accelerate the calculation ,Bubble The sources of internal computing nodes are all from the resident preheating resource pool by default , This is the same as the quasi real time execution framework . At the same time, we provide flexible pluggability , If necessary , allow Bubble Compute nodes from Resource Manager Apply on the spot ( It can be switched by configuration ).

In terms of scheduling timing , One Bubble The internal node scheduling strategy is related to its corresponding input edge characteristics , It can be divided into the following situations :

  • There is no such thing as input edge Of bubble root vertext( such as Fig.9 Medium V2): The job is pulled up by the scheduler as soon as it runs .
  • Only sequential edge Input bubble root vertex( such as Fig.9 Medium V6): Wait until the completion degree of the upstream node reaches the configured level min fraction The proportion ( The default is 100%, That is, all upstream nodes complete ) Just scheduled .
  • Bubble Inside vertex( That is, all input edges are concurrent edge, such as Fig.9 Medium V4, V8, V10), Because it is entirely through concurrent edge To make a connection , It will naturally be triggered simultaneously with the upstream .
  • Bubble Exist on the boundary mixed-inputs Of bubble root vertex( such as Fig.9 Medium V7). This situation requires some special treatment , although V7 And V4 It's through concurrent edge link , But because of V7 At the same time V3 adopt sequential edge control , So in fact, we need to wait V3 complete min-fraction Before scheduling V7. For this scenario , Can be V3 Of min-fraction Configured to be smaller ( even to the extent that 0) To trigger... In advance ; Besides Bubble Internally, we also provide progressive Ability to schedule , It will also help in this scenario .

For example 7 Medium Bubble#1, There is only one SequentialEdge External dependent edge , When V2 After completion , It will trigger V6 + V10( adopt concurrent edge) Overall scheduling , So that the whole Bubble#1 Run up .

stay Bubble After triggered scheduling , Will go straight to SMODE Admin Application resources , The default is integration Gang-Scheduling(GS) Resource application model , In this mode , Whole Bubble Will build a request, Send to Admin. When Admin When there are enough resources to meet this application , Will , It also includes pre pulling worker The scheduling result of the information is sent to bubble Operational AM.


Fig.11 Bubble And Admin Resource interaction between

In order to support tight resources and Bubble Internally dynamically adjusted scenes ,Bubble It also supports Progressive Resource application model . This model allows Bubble Every one of them Vertex Independently apply for resources and scheduling . For such applications ,Admin As long as there is incremental resource scheduling, the results will be sent to AM, Until corresponding Vertex Of request Completely satisfied . For the unique application in this scenario, it will not be expanded for the time being .

After the quasi real-time implementation of the framework upgrade ,SMODE Resource management in services (Admin) And many DAG Job management logic (MultiJobManager) Has been decoupled , therefore bubble Resource application logic in pattern , Just and Admin Interact , Not for normal quasi real-time operations DAG Any impact of implementing management logic . in addition , In order to support online gray hot upgrade capability ,Admin Each resident compute node in the managed resource pool passes through Agent+ many Labor mode , When scheduling specific resources , And according to AM edition , Conduct worker Version matching , And dispatch those that meet the conditions labor to Bubble Homework .

2.2.2 Bubble data Shuffle

For crossing Bubble bourndary Upper sequential edge, The data transmitted on it is the same as that of ordinary offline jobs , Data transmission is carried out by dropping the disk . Here we mainly discuss Bubble Internal data transmission mode . According to the job described earlier bubble Segmentation principle ,bubble Internal usually have sufficient data pipeline characteristic , And the amount of data is small . So for bubble Inside concurrent edge The data on the , The network with the fastest execution speed is adopted / Memory direct transmission shuffle.

Among them, the network shuffle The method is the same as the classic quasi real-time job , Through the establishment between upstream nodes and downstream nodes TCP link , Connect directly to the network and send data . such push-based Network data transmission mode , It is required that the upstream and downstream must be pulled up at the same time , According to the chain dependency transfer , Such networks push Patterns are strongly dependent on Gang-Scheduling, In addition, in fault tolerance , Long tail avoidance also limits bubble The flexibility of the .

In order to better solve the above problems , stay Bubble The mode of , Explored memory shuffle Pattern . In the first mock exam , The upstream node writes data directly to the cluster ShuffleAgent(SA) The memory of the , The downstream node starts from SA Read data from . Memory shuffle Fault tolerance of patterns , Expand , Including the ability to drop some data asynchronously when the memory is insufficient to ensure higher availability , from ShuffleService Independently provide . This mode can be supported at the same time Gang-Scheduling/Progressive Two scheduling modes , It also makes it have strong scalability , For example, through SA Locality Scheduling enables more Local data fetch , Through blood based instance level retry Implement a more granular fault tolerance mechanism, and so on .


Fig.12 Network Shuffle VS Memory Shuffle

Given the memory shuffle Provides many scalability benefits , This is also online Bubble The default for the job shuffle The way , Network direct transmission is an alternative , Allow for very small-scale jobs with low cost of fault tolerance , Use... Through configuration .

2.3 Fault-Tolerance

As a new hybrid execution mode ,Bubble Execution explores various fine-grained balances between offline jobs and integrated scheduled quasi real-time jobs . In a complex cluster online , Various failures in the operation process are inevitable . and bubble In this new mode , In order to minimize the impact of failure , And achieve the best balance between reliability and operation performance , Its failure handling strategies are also more diversified .

For different exception problems , We have designed various targeted fault-tolerant strategies , Through various efforts from fine to coarse , Handle various exception scenarios that may be involved in the execution process , such as : towards admin Failed to apply for resources 、bubble Medium task Execution failure (bubble-rerun)、bubble Failed fallback multiple times (bubble-renew), In the process of execution AM happen failover wait .

2.3.1 Bubble Rerun

at present Bubble When an internal compute node fails , Default adopted retry Strategy is rerun bubble. When bubble This execution of a node in (attempt) Failure , Will immediately rerun Whole bubble, Cancel the execution of the same version of attempt. While returning resources , Trigger bubble Re execution . In this way , Guarantee bubble Corresponding to all computing nodes in the (retry) attempt Versions, .

Trigger bubble rerun There are a lot of , The common ones are as follows :

  • Instance Failed: Compute node execution failed , Usually by the upper engine runtime Error trigger ( For example, throw out retryable-exception).
  • Resource Revoked: Online production environment , There are many scenarios that will cause the asset node to restart . For example, the whole machine oom、 The machine is blackened, etc . stay worker After being killed , After restart worker It will be reconnected according to the initial startup parameters admin. here ,admin Will take this. worker The restart message is encapsulated into Resource Revoked Send to the corresponding AM, Trigger bubble rerun.
  • Admin Failover: because Bubble The computing resources used by the job come from SMODE Of admin Resource pool , When admin For some reason Failover, perhaps SMODE When the overall service is restarted , Assigned to AM The compute node will be stopped .Admin stay Failover After that, it does not perceive the current allocation of each node AM Information , These restart messages cannot be sent to AM. The current treatment is , Every AM Subscribe to the admin Corresponding nuwa, stay admin This file will be updated after restart . AM After sensing the information update , Will trigger the corresponding taskAttempt Failed, thus rerun bubble.
  • Input Read Error: When the compute node executes , Not reading upstream data is a common error , about bubble Come on , There are actually three different types of this error :
  • Bubble Internal InputReadError: because shuffle The data source is also bubble Inside , stay rerun bubble when , Corresponding to upstream task Will also run again . There is no need for targeted treatment .
  • Bubble At the boundary InputReadError: shuffle The data source is upstream offline vertex( Or maybe another bubble) Medium task produce ,InputReadError Will trigger upstream task Heavy run , At present bubble rerun After that will be delay live , Until the upstream blood (lineage) All the data in the new version of ready Then trigger the scheduling .
  • Bubble The downstream InputReadError: If bubble The downstream task There is InputReadError, This event will trigger bubble One of the task Heavy run , At this time due to the task Dependent memory shuffle The data has been released , Will trigger the whole bubble rerun.

2.3.2 Bubble Renew

stay Admin When resources are scarce , Bubble from Admin Your resource request may time out due to waiting . In some unusual cases , such as bubble When applying for resources onlinejob The service is at the interval of restart , There will also be failure to apply for resources . under these circumstances ,bubble All in vertex Will fall back to pure offline vertex State execution . Besides, for rerun The number of times exceeds the upper limit bubble, It will also trigger bubble renew. stay bubble renew After occurrence , All its internal edges are restored to sequential edge, And at all vertex After reinitialization , Trigger events by playing back all internal scheduling state machines , Re trigger these... In a purely offline manner vertex Internal state transition . Ensure current bubble In all of the vertex After going back , Will be executed in the classic offline mode , So as to effectively ensure the normal operation terminated.


Fig. 13 Bubble Renew

2.3.3 Bubble AM Failover

For normal offline jobs , stay DAG In the frame , The internal scheduling events related to each computing node are persisted and stored , It is convenient to calculate node level increment failover. But for bubble For homework , If in bubble The execution process took place AM failover restart , By storing events replay To recover bubble, It is possible to recover to running In the middle of . However, due to internal shuffle Data may be stored in memory and lost , Back to the middle running State of bubble Unfinished calculation nodes in , Because it can't read the upstream shuffle Data and immediate failure .

This is essentially because in Gang-Scheduled Bubble On the scene ,bubble The whole is as failover The smallest particle size exists , So once it happens AM Of failover, Recovery granularity should also be bubble On this level . So for bubble All related scheduling events , In operation, it will be regarded as a whole , At the same time when bubble Brush out at the beginning and at the end bubbleStartedEvent and bubbleFInishedEvent. One bubble All the relevant events stay failover After recovery, it will be taken as a whole , Only the ending bubbleFInishedEvent That means bubble It can be considered a complete end , Otherwise, it will run the whole... Again bubble.

For example, in the example below ,DAG There are two Bubble(Bubble#0: {V1, V2}, Bubble#1: {V3, V4}), In the event of a AM Restart time ,Bubble#0 already TERMINATED, And write BubbleFinishedEvent. and Bubble#1 Medium V3 Also already Terminated, however V4 be in Running state , Whole Bubble #1 Did not reach the final state .AM recover after ,V1,V2 Will revert to Terminated state , and Bubble#1 Will start over .


Fig 14. AM Failover with Bubbles

3. Online effect

At present Bubble The model has been launched in the public cloud ,SQL In homework 34% perform Bubble, The average daily execution includes 176K individual Bubble.

We aim at signature same query stay bubble execution Compare on and off , We find that on the basis of basically unchanged overall resource consumption , The execution performance of jobs has improved 34%, The amount of data processed per second has increased 54%.



Fig 15. Executive performance / Resource consumption comparison

In addition to the overall comparison , We aim at VIP Users also conducted targeted analysis , user Project It's on Bubble After the switch ( The point marked in red in the following figure is open Bubble The timing of the ), The average execution performance of jobs has been significantly improved .


Fig 16. VIP User on Bubble Comparison of average execution time after


copyright notice
author[Heart of machine],Please bring the original link to reprint, thank you.

Random recommended