current position:Home>Kwai Flink based real time digital warehouse practice

Kwai Flink based real time digital warehouse practice

2021-08-31 16:05:16 Heart of machine

One 、 Kwai Kong real-time computing scenario


The real-time computing scenario in Kwai do business is mainly divided into four parts. :

  • Company level core data : Including the company's operating market , Real time core daily , And mobile data . Equivalent to the team will have the company's overall indicators , And all business lines , Such as video related 、 Live related , There will be a core real-time kanban ;
  • Real time indicators of large events : The core content is the real-time large screen . For example, the Kwai Fu evening activities. , We will have an overall large screen to see the current situation of the overall activities . A large-scale event will be divided into N Different modules , We have different real-time data Kanban for different playing methods of each module ;
  • Data of operation part : Operation data mainly includes two aspects , One is the creator , The other is content . For creators and content , On the operation side , For example, there is a big one online V The activities of , We want to see some information, such as the real-time status of the live studio , And the traction of the live broadcasting room for the market . Based on this scenario , We will do all kinds of real-time large screen multidimensional data , And some data of the market .

    Besides , This also includes the support of operation strategy , For example, we may discover some hot content and hot creators in real time , And some current hot spots . We output strategies based on these hot spots , This is also some support we need to provide ;

    And finally C End data display , For example, there are creators and anchor centers in Kwai Fu. , There will be some close pages such as anchor close , We also do part of the real-time data on the broadcast page .

  • Real time features : Including search recommendation features and real-time advertising features .

Two 、 Kwai Kong real time multi warehouse structure and safeguard measures

1. Objectives and difficulties


1.1 The goal is

  • First of all, because we do counting , Therefore, it is hoped that all real-time indicators have offline indicators to correspond , The overall data difference between real-time indicators and offline indicators is required to be 1% within , This is the minimum standard .
  • The second is data delay , Its SLA The standard is that the data delay of all core report scenarios during the activity cannot exceed 5 minute , this 5 Minutes includes the time after hanging up and recovery , If it exceeds, it means SLA Substandard .
  • Finally, stability , For some scenarios , For example, after the job is restarted , Our curve is normal , It will not cause some obvious exceptions to the indicator output due to job restart .

1.2 difficulty

  • The first difficulty is the large amount of data . The overall daily inlet flow data is about trillions . In the scene of activities such as Spring Festival Gala ,QPS The peak can reach 100 million / second .
  • The second difficulty is that component dependencies are complex . Maybe some of this link depends on Kafka, Some depend on Flink, There are also some dependencies KV Storage 、RPC Interface 、OLAP Engine, etc , We need to think about how to distribute in this link , To make these components work properly .
  • The third difficulty is the complex link . At present, we have 200+ Core business operations ,50+ Core data source , Overall operation exceeds 1000.

2. Real time data warehouse - Layered model

Based on the above three difficulties , Let's take a look at the data warehouse architecture :


As shown above :

  • There are three different data sources at the bottom , Client logs 、 Server logs and Binlog journal ;
  • The public foundation layer is divided into two different levels , One is DWD layer , Make detailed data , The other is DWS layer , Do public aggregation data ,DIM Is what we often call the dimension . We have a topic pre layering based on offline data warehouse , This topic is pre layered and may include traffic 、 user 、 equipment 、 Video production and consumption 、 Risk control 、 Social, etc .

    • DWD The core work of the layer is standardized cleaning ;
    • DWS Layer is to integrate dimension data and DWD Layers are associated , After association, some aggregation hierarchies with general granularity are generated .
  • Up there is the application layer , Including some large market data , Multidimensional analysis model and business thematic data ;
  • The top is the scene .

The whole process can be divided into three steps :

  • The first step is to do business data , It is equivalent to receiving business data ;
  • The second step is data capitalization , It means doing a lot of cleaning on the data , Then form some regular and orderly data ;
  • The third step is data business , It is understandable that data can feed back the business at the real-time data level , Provide some empowerment for business data value construction .

3. Real time data warehouse - Safeguards

Based on the above hierarchical model , Let's look at the overall safeguards :


The guarantee level is divided into three different parts , Quality assurance , Timeliness guarantee and stability guarantee .

  • Let's first look at the quality assurance of the blue part . For quality assurance , You can see in the data source phase , Such as out of order monitoring of data sources , This is based on our own SDK The collection of , And consistency calibration of data source and offline . The calculation process of R & D stage has three stages , They are the R & D stage 、 Launch phase and service phase .

    • The R & D phase may provide a standardized model , Based on this model, there will be some problems Benchmark, And do offline comparison and verification , Ensure that the quality is consistent ;
    • The online stage is more about service monitoring and indicator monitoring ;
    • In the service phase , If something unusual happens , To do first Flink State pull up , If there are some unexpected scenarios , We will do offline overall data repair .
  • The second is timeliness guarantee . For data sources , We also monitor the delay of data sources . There are actually two things in the R & D stage :

    • The first is pressure measurement , Regular tasks will take the most recent 7 Days or recently 14 Days of peak traffic to see if there is a task delay ;
    • After passing the pressure test , There will be some tasks online and restart performance evaluation , It is equivalent to following CP After recovery , What is the performance of restart .
  • The last one is stability guarantee , This will be done more in large-scale events , Such as switching drill and hierarchical support . We will limit the current based on the previous pressure measurement results , The purpose is to ensure that the operation exceeds the limit , It is still stable , There won't be a lot of instability or CP Failure situation . Then we will have two different standards , One is cold standby double machine room , The other is hot standby double machine room .

    • Cold standby double machine room : When a single room hangs up , We'll pull it up from another computer room ;
    • Hot standby double machine room : It is equivalent to deploying the same logic once in two machine rooms .

The above is our overall safeguard measures .

3、 ... and 、 Kwai problem and solution

1. PV/UV Standardization

1.1 scene

The first question is PV/UV Standardization , Here are three screenshots :


The first picture is the warm-up scene of the Spring Festival Gala , It's a way to play , The second and third pictures are screenshots of the red envelope activity and the live room on the day of the Spring Festival Gala .

In the course of the activity , We found that 60~70% The requirement is to calculate the information in the page , Such as :

  • How many people have come to this page , Or how many people click to enter this page ;
  • How many people came to the event ;
  • A pendant in the page , How many hits did you get 、 How many exposures were generated .

1.2 programme

Abstract this scene is the following SQL:


Simply speaking , Is to filter from a table , Then aggregate according to the dimension level , Then there are some Count perhaps Sum operation .

Based on this scenario , Our initial solution is shown on the right of the figure above .

We used Flink SQL Of Early Fire Mechanism , from Source Data source fetching data , Then I did DID A new barrel . For example, the first purple part is divided into barrels according to this , The reason to divide barrels first is to prevent one DID There are hot issues . After dividing the barrels, there will be one called Local Window Agg Things that are , It is equivalent to adding the same type of data after the data is divided into buckets .Local Window Agg Then proceed according to the dimension Global Window Agg Closed barrel , The concept of combining barrels is equivalent to calculating the final result according to dimensions .Early Fire The mechanism is equivalent to Local Window Agg Open a sky window , Then output it every minute .

We encountered some problems in this process , As shown in the lower left corner of the picture above .

There is no problem when the code is running normally , However, if the overall data is delayed or the historical data is traced , Like a minute Early Fire once , Because there will be a large amount of data when tracing history , So it may lead to 14:00 Go back to history , I read it directly 14:02 The data of , and 14:01 That point was lost , What happens when you lose it ?


In this case , The curve at the top of the figure is Early Fire Results of backtracking historical data . Abscissa is minutes , The ordinate is the page up to the current time UV, We found that some points were horizontal , Means there are no data results , Then a sharp increase , Then it's horizontal , Then there was another sharp increase , The expected result of this curve is actually the smooth curve at the bottom of the figure .

To solve this problem , We used Cumulate Window Solutions for , The solution is in Flink 1.13 It is also involved in the version , The principle is the same .


Open a large sky level window for data , A small minute window was opened under the big window , Data by data itself Row Time Drop to minute window .

  • Watermark Pushing through the window event_time, It will trigger a distribution , In this way, the problem of backtracking can be solved , The data itself falls in the real window , Watermark advance , Triggered at the end of the window .
  • Besides , This method can solve the problem of disorder to a certain extent . For example, its out of order data itself is a state that is not discarded , The latest cumulative data will be recorded .
  • Finally, semantic consistency , It will be based on the event time , When the disorder is not serious , The consistency with the results calculated offline is quite high .

Above is PV/UV A standardized solution .

2. DAU Calculation

2.1 Background introduction

So let's talk about that DAU Calculation :


Our active equipment for the whole market 、 New equipment and reflux equipment have more monitoring .

  • Active devices refer to devices that have been here that day ;
  • New equipment refers to the equipment that has been here on the same day and has not been here in history ;
  • Reflux equipment refers to the equipment that has been here on the same day and N Equipment that has not been here in days .

But we may need to 5~8 This is so different Topic To calculate these indicators .

Let's take a look at the offline process , How should logic count .

First, let's calculate the active devices , Put these together , Then do a day level de duplication under the dimension , Then associate the dimension table , This dimension table includes the first and last time of the equipment , It is the time of the first and last access to the device as of yesterday .

After getting this information , We can do logical calculations , Then we will find that the newly added and returned devices are actually a sub tag in the active device . Adding a new device is a logical process , The reflux device is made 30 Logical processing of days , Based on such a solution , Can we simply write one SQL To solve this problem ?

Actually, that's what we did at first , But there were some problems :

  • The first question is : The data source is 6~8 individual , And our market caliber is often fine tuned , If it's a single assignment , It should be changed during each fine-tuning process , The stability of a single job will be very poor ;
  • The second question is : The amount of data is trillions , This leads to two situations , First, the stability of single job of this order is very poor , The second is used for real-time Association of dimension tables KV Storage , Any one of these RPC Service interface , It is impossible to ensure service stability in the scenario of trillions of data ;
  • The third question is : We have high requirements for delay , The delay is required to be less than one minute . Batch processing shall be avoided for the whole link , If there are some single point problems with task performance , We also need to ensure high performance and scalability .

2.2 Technical solution

For the above problems , Introduce how we do it :


As the example above , The first step is to A B C These three data sources , First by dimension and DID Do minute level weight removal , After de duplication, three minute de duplication data sources are obtained , Then take them Union together , Then do the same logical operation .

This is equivalent to the entrance of our data source has changed from trillion to 10 billion , The minute level de duplication is followed by a day level de duplication , The generated data source can change from tens of billions to billions .

With billions of levels of data , Let's de associate data service , This is a more feasible state , Equivalent to de associating user portraits RPC Interface , obtain RPC After the interface , Finally written to the target Topic. This goal Topic Will be imported into OLAP engine , Provide multiple different services , Including mobile services , Large screen service , Index Kanban service, etc .

This scheme has three advantages , Stability 、 Timeliness and accuracy .

  • The first is stability . Loose coupling can be simply understood as when the data source A Logical and data sources B When the logic of needs to be modified , It can be modified separately . The second is task scalability , Because we split all the logic into very fine granularity , When something happens in some places, such as traffic problems , It will not affect the later parts , Therefore, its capacity expansion is relatively simple , In addition, there are post service and controllable state .
  • The second is timeliness , We do millisecond delay , And rich in dimensions , On the whole 20+ Multidimensional aggregation of dimensions .
  • Finally, accuracy , We support data validation 、 Real-time monitoring 、 Model export unification, etc .

At this point, we have another problem - Disorder . For the above three different jobs , Each job restart will have a delay of at least about two minutes , Delays can lead to downstream data sources Union Together, there will be disorder .

2.3 Delay calculation scheme

In the case of disorder above , What are we going to do with ?


We have three solutions altogether :

  • The first solution is to use “did + dimension + minute ” Deduplication ,Value Set to “ Have you ever been to ”. Like the same did,04:01 Here comes a , It outputs the results . alike ,04:02 and 04:04 Results will also be output . But if 04:01 Come again , It will throw away , But if 04:00 Come on , The results will still be output .

    There are some problems with this solution , Because we save by minute , save 20 The state size of minutes is save 10 Twice as many minutes , In the later state, the size is a little uncontrollable , So we changed the solution 2.

  • The second solution , Our approach involves a hypothetical premise , It is assumed that there is no disorder of data sources . under these circumstances ,key Deposit is “did + dimension ”,Value by “ Time stamp ”, Its update method is shown in the figure above .

    04:01 Here comes a piece of data , Output results .04:02 Here comes a piece of data , If it's the same did, Then it updates the timestamp , Then still do the result output .04:04 It's the same logic , Then update the timestamp to 04:04, If one comes back 04:01 The data of , It finds that the timestamp has been updated to 04:04, It will discard this data .

    This approach greatly reduces some of the states it needs , But there is zero tolerance for disorder , No disorder is allowed , Because we can't solve this problem , So we came up with a solution 3.

  • programme 3 It's in the program 2 Based on Timestamp , Added a ring like buffer , Out of order is allowed within the buffer .

    such as 04:01 Here comes a piece of data , Output results ;04:02 Here comes a piece of data , It updates the timestamp to 04:02, And the same device will be recorded in 04:01 I've been here . If 04:04 Another piece of data , Make a displacement according to the corresponding time difference , Finally, through such logic to ensure that it can tolerate a certain disorder .

Taken together, these three schemes :

  • programme 1 In tolerance 16 Minutes out of order , The status size of a single job is 480G about . Although this situation ensures accuracy , However, the recovery and stability of the operation are completely uncontrollable , So we gave up the plan ;
  • programme 2 yes 30G Left and right state size , For disorder 0 tolerate , But the data are not accurate , Because our requirements for accuracy are very high , Therefore, the scheme was abandoned ;
  • programme 3 Status and scheme of 1 comparison , Although its state has changed, it has not increased much , And it can meet the requirements of the scheme as a whole 1 The same effect . programme 3 What is the time to tolerate disorder 16 minute , If we update an assignment normally ,10 Minutes is enough to restart , Therefore, the scheme was finally selected 3.

3. Operational scenarios

3.1 Background introduction


The operation scenario can be divided into four parts :

  • The first is data large screen support , It includes the analysis data of single live broadcasting room and large market , Minute delay is required , The update requirements are relatively high ;
  • The second is live Kanban support , The data of live broadcast Kanban will be analyzed in specific dimensions , Specific population support , High requirements for dimension richness ;
  • The third is the list of data strategies , This list mainly predicts popular works 、 Hot style , Hour level data is required , Low update requirements ;
  • The fourth is C End real-time indicator display , The number of queries is quite large , However, the query mode is relatively fixed .

This is analyzed below 4 Different states produce some different scenes .


front 3 There is basically no difference between the two , Only in query mode , There are specific business scenarios , There are general business scenarios .

For 3 Species and 4 Kind of , It has low requirements for updates , The requirements for throughput are relatively high , The curves in the process do not require consistency . The first 4 This query mode is more about single entity queries , For example, to query the content , What are the indicators , And right QPS The requirements are quite high .

3.2 Technical solution

For above 4 Different scenes , How do we do it ?


  • First, let's take a look at the basic details layer ( Left in the figure ), The data source has two links , One of the links is the consumption stream , Such as live consumption information , And watch / give the thumbs-up / Comment on . After a round of foundation cleaning , Then do dimension management . The upstream dimension information comes from Kafka,Kafka Dimension with some content written , Put it in KV Storage inside , Include some user dimensions .

    After these dimensions are associated , Final write Kafka Of DWD In fact , Here is to improve the performance , We did the L2 cache operation .

  • As shown in the top of the figure , We read DWD Layer data, and then do basic summary , The core is window dimension aggregation generation 4 Data with different granularity , They are market multidimensional summary topic、 Multidimensional summary of live broadcasting room topic、 Author multidimensional summary topic、 User multidimensional summary topic, These are data of common dimensions .
  • As shown in the figure below , Based on these common dimension data , We then process the data of the personalized dimension , That is to say ADS layer . After obtaining these data, there will be dimension expansion , Including content expansion and operation dimension expansion , Then do polymerization , For example, there will be e-commerce real-time topic, Institutional service real time topic And big V Live broadcast real time topic.

    There is a benefit to dividing into such two links : One place deals with common dimensions , Another place deals with the dimension of personalization . The requirements for universal dimension support will be higher , The personalized dimension will do a lot of personalized logic . If the two are coupled together , You will find that the task often goes wrong , And it is not clear which task is responsible for what , Such a stable layer cannot be built .

  • As shown on the right , Finally, we used three different engines . In a nutshell Redis Query used C The scene of the end ,OLAP The query uses a large screen 、 Scenario of business Kanban .

Four 、 The future planning

There are three scenarios mentioned above , The first scenario is standardization PU/UV The calculation of , The second scene is DAU Overall solution , The third scenario is how to solve the problem on the operation side . Based on these content , We have some plans for the future , It is divided into 4 Parts of .


  • The first part is the improvement of real-time guarantee system :

    • On the one hand, do some large-scale activities , Including Spring Festival Gala activities and subsequent normalized activities . How to guarantee these activities , We have a set of norms to build a platform ;
    • The second is the formulation of hierarchical guarantee standards , What are the assurance levels for which operations / standard , There will be a standardized description ;
    • The third is the ability of the engine platform to promote the solution , Include Flink Some engines for tasks , We will have a platform on this , Standardize based on this platform 、 Promotion of standardization .
  • The second part is the construction of real-time data warehouse content :

    • On the one hand, it is the output of scenario scheme , For example, there will be some general schemes for activities , Instead of developing a new solution every time ;
    • On the other hand, it is content data hierarchical precipitation , For example, the current data content construction , There are some missing scenes in terms of thickness , Including how content can better serve upstream scenarios .
  • The third part is Flink SQL Scene construction , Include SQL Continuous promotion 、SQL Mission stability and SQL Task resource utilization . In the process of estimating resources , Will consider, for example, what kind of QPS In the scene of , SQL What kind of solution , How long can it last .Flink SQL It can greatly reduce human efficiency , But in the process , We want to make business operations easier .
  • The fourth part is the exploration of batch flow integration . The real-time warehouse scenario is actually offline ETL Computational acceleration , We will have many hours of tasks , For these tasks , During each batch processing, some logic can be put into stream processing to solve , This is for offline data warehouse SLA The system has been greatly improved .

copyright notice
author[Heart of machine],Please bring the original link to reprint, thank you.

Random recommended