systemDesign

Designing Data-Intensive Application - Stream Processing - Processing Streams

這是Designing Data-Intensive Application的第三部分第二章節Part3: 處理流

流處理Part1 - 傳遞事件流

流處理Part2 - 數據庫與流

流處理Part3 - 處理流

本篇是系列文的Part3

本文所有圖片或代碼來自於原書內容

處理流

我們在前兩篇系列文中 討論了流的來源(用戶活動事件, 寫入數據庫等等) 和流如何傳輸(直接通過消息傳送,通過消息代理,通過事件日誌等等)

最後就是你可以用流做什麼 以及如何處理它 一般來說有三個選項

1.你可以將事件中的數據寫入數據庫/緩存/搜索索引或類似的衍生存儲系統 然後被其他客戶端查詢

2.你能以某種方式將事件推送給用戶 比如發送郵件或推送通知

3.你可以處理一個或多個輸入流 並產生一個或多個輸出流

本篇文章會著重在第三項 處理流以產生其他衍生流

流處理的應用

長期以來 流處理一直用於監控目的 如果某個事件發生 單位希望能得到警報 比如說

1.欺詐檢測系統需要確定信用卡的使用模式是否有意外地變化 如果系統判斷此卡被盜刷 要馬上鎖卡

2.交易系統需要檢查金融市場的價格變化 並依指定的規則進行交易

3.製造系統需要監控工廠中機器的狀態 如果有故障要馬上找出問題

4.軍事和情報系統需要跟蹤潛在侵略者的活動 如果偵測到襲擊要預先應對

隨著時代進步 軟體業界也有越來越多關於流的應用

應用1: 復合事件處理 Complex event processing

適用於需要搜索某些事件模式的應用 就像regular expression允許你在字符串中搜索特定字符模式的方式 CEP允許你指定規則以在流中搜索某些事件模式

CEP系統通常使用高層次的聲明式查詢語言(比如SQL)來描述應該檢測到的事件模式 通常也會在內部維持一個state machine(狀態機) 當匹配發現時 引擎發出一個復合事件(complex event) 並附有檢測到的事件模式細節

你也可以從另一個角度理解 復合事件處理和一般數據庫的使用方式正好相反 數據庫是將存儲資料視為最重要目的 查詢只是臨時的 過了沒找到就沒了 但復合事件處理則是把查詢當作最重要目的 流過了不符合就不管了

應用2: 流分析

使用流處理的另一個領域是對流進行分析 通常是關注大量事件上的指標 比如

1.測量某種類型事件發生的頻率

2.滾動計算一段時間窗口內某個值的平均值

3.將當前的統計值與先前的時間區間的值對比(檢測趨勢 當指標比上週高或低時通知)

這些統計值通常是在固定時間區間內進行計算的 比如說過去五分鐘的QPS跟P99

目前已經有許多分佈式流處理框架的設計都是針對分析設計的 比如Apache Storm, Spark Streaming, Flink, Concord, Samza, Kafka Streams, Google Cloud Dataflow, Azure Stream Analytics

應用3: 維護物化視圖 Maintain materialized views

我們知道數據庫的Change Capture可以用於維護衍生數據系統(緩存, 搜索索引和數據倉庫) 使其與來源數據庫保持同步 我們可以把這些視為物化視圖(materialized views): 在某個數據集上衍生出一個替代視圖以便高效查詢,並在底層數據變更時更新視圖

事件溯源中 應用程序的狀態是透過apply事件日誌來維護 通常僅考慮某個時間窗口內的事件通常是不夠的 構建物化視圖可能需要任意時間段內的所有事件(除了那些可能由日誌壓縮丟棄的過時事件)

實際上 你需要一個可以一直延伸到時間開端的窗口 Samza和Kafka Streams支持這種用法

應用4: 在流上搜索

我們可以搜尋某些單獨的事件

比如說 當市場出現了符合某消費者要求的房地產 房地產網站可以通知客戶

傳統的搜尋引擎 是首先索引文件 然後在索引上跑查詢 但搜索流卻反了過來 查詢被存了下來 逐一搜尋流的每個事件

Elasticsearch的這種過濾器功能 是實現這種流搜索的一種選擇

時間推理

看完以上的應用 你會發現我們在處理流的時候 幾乎都需要跟時間打交道 尤其是用於分析目的時候

不像批處理 對於批處理的運算通常是確定性的 也就是不論你什麼時間運算 因為事件本身已經有個時間戳 所以運算完結果會一樣

但對流處理來說 許多框架使用處理機器上的本地系統時鐘來確定窗口 概念簡單 就假設事情發生跟事情處理的時間一樣 但要是真的有顯著的延遲 那很多處理都會不準確

事件時間與處理時間

很多原因會導致處理延遲 比如說排隊 網路故障 消息代理的性能問題 等等

更麻煩的是 消息延遲還可能導致無法預測消息順序 比如說事件A比事件B先被發出 但事件B先到達流處理器 即使它們實際上是以相反的順序發生的

所以 將事件時間和處理時間搞混會導致錯誤的數據

既然可能延遲 那我就用事件時間當基準吧

用事件時間來定義窗口 也有一個棘手的問題 那就是你永遠也無法確定是不是已經收到了特定窗口的所有事件 還是說還有一些事件還在來的路上

比如說你要分析每分鐘的請求數 10:37的數據已經接收處理得差不多了 但你要等到幾點幾分才能確定再也不會有10:37這個分鐘裡面的事件再傳過來呢

假設你等到10:40 你處理完資料後 有一個事件來了 你有兩個處理方式

1.忽略這些滯留(straggler)事件: 因為這通常只是一小部分 而且還可以監控滯留事件的數目 如果太多就發出警報

2.發佈一個更正(correction): 一個包括滯留事件的更新窗口值 比較麻煩 需要收回以前的分析輸出

你用的是誰的時鐘

更麻煩的問題來了 如果一個事件可能在很多不同的地方進行緩衝(buffered) 要為這個事件發配時間戳就更難了

比如說 蘋果手機有些應用 會不定時向蘋果主機回報一些資訊 比如用量等等 但如果手機連不到網路 這些事件就會被待在手機裡緩衝 等到下次連到網路的時候再向服務器上報 這可能是幾小時或是幾天 對於這個流的任意消費者而言 它們就如延遲極大的滯留事件

在這個例子中 事件應該要被記錄成用戶交互發生的時間 取決時移動設備的本地時鐘 但用戶控制的設備上的時鐘通常是不可信的

要校正不正確的設備時鐘 一般來說需要記錄三個時間戳

1.事件發生的時間 取決於設備時鐘

2.事件發送往服務器的時間 取決於設備時鐘

3.事件被服務器接收的時間 取決於服務器時鐘

通過從第三個時間戳中減去第二個時間戳 可以估算設備時鐘和服務器時鐘之間的偏移(網絡延遲忽略) 然後可以將事件時間戳減去這個偏移 就有事件實際發生的真實時間

窗口的類型

當你知道如何確定一個事件的時間戳後 下一步就是如何定義時間段的窗口 以下是常用的窗口

1.滾動窗口(Tumbling Window): 滾動窗口有著固定的長度 每個事件都僅能屬於一個窗口

假設你有一個1分鐘的滾動窗口 則所有時間戳在10:03:0010:03:59之間的事件會被分組到一個窗口中 10:04:0010:04:59之間的事件被分組到下一個窗口

2.跳動窗口(Hopping Window)

跳動窗口也有著固定的長度 但允許窗口重疊

假設你有一個長度五分鐘 hopping 1分鐘的窗口 那第一個窗口包含10:03:0010:07:59之間的事件 而下一個窗口將覆蓋10:04:0010:08:59之間的事件 依此類推

3.滑動窗口(Sliding Window)

滑動窗口包含了彼此間距在特定時長內的所有事件 沒有一個很準確的分隔 無時無刻都在多動

比如一個5分鐘的滑動窗口 應該包含10:03:3910:08:12這兩個事件 因為他們差距小於五分鐘 但跳動或滾動則會把他們分開 因為他們有很明確的邊界

4.會話窗口(Session window)

會話窗口沒有固定的持續時間 我們將一用戶出現時間相近的所有事件分組在一起 等到用戶很久沒動作了(比如30min) 我們就定義窗口結束

以Session來區分事件是個很常見的方式

流式連接 Stream Joins

我們知道批次處理中很常會需要Join兩個不同的table 在流處理中也一樣 但是對於一個隨時會出現的事件 這困難度就比已經批處理高上太多

為了更簡單瞭解 我們分成三種情況 stream-stream join, stream-table join, table-table join

stream-stream join(流流連接, 窗口連接)

用例子切入 你的搜尋網站有兩個stream:

1.當使用者搜尋一個詞的時候 發出一個event 包含查詢的詞和返回結果

2.當使用者點擊其中一個返回結果 就發出另一個event 記錄這個點擊事件

那今天如果我想要計算網站中給定一個搜索詞時每一個URL的點擊率 就必須將這兩個流(搜索點擊)利用sessionId join起來

你可能會說 為什麼不在點擊的時候 順便記錄搜索的詞就搞定了 幹嘛要JOIN?

如果我們這樣做 我們就無法知道使用者搜索後但都不點擊的情況 而這通常也是一個搜尋引擎想知道的情況(可能代表你的網站不夠好 使用者查完後對於返回結果沒興趣)

在這個例子 連接Join兩個流是必須的

要做到這點 流處理器需要維護一個狀態(state) 比如說在這個例子 我們需要按照sessionId去index最近一小時內發生的事件

當有搜尋事件或是點擊事件發生時 就會被加到正確的sessionId的索引上 如果同一個索引上在一小時內看到了這兩種事件 就再發出一個搜尋且點擊的事件 如果只有搜尋事件 就發出搜尋且未點擊的事件

stream-table join(流表連接, 流擴展)

上例子 我們有一個描述用戶活動事件的流(事件中包含userId) 還有一個數據庫 裡面有每個userId的更多資料比如userName, userAge等等

我們想要消費這個描述用戶活動事件的流 消費之後發一個數據庫查詢擴充(enriching)這個事件的資料 然後再發出另一個流

這是很常見的應用

我們可以將數據庫副本加載到流處理器中 以便在本地進行查詢而無需網路往返 如果副本夠小 甚至可以直接存成hashMap放在內存 大一點也可以存成index放在硬碟

那如果存在內存 使用者突然更新的檔案怎麼辦 比如說突然改名字或是改地址?

沒錯 我們可愛的Change Capture又發揮用途了 我們讓流處理器同時消費兩個事件 活動事件跟使用者數據變更事件 當發生活動事件時 把userId去查詢內存的本地資料庫擴充事件 當發生數據變更事件時 更新本地資料庫

流表連接實際上非常類似於流流連接 你就把表當成一個可以追溯到最一開始的流

table-table join(表表連接, 維護物化視圖)

其實就是Database的table join

容錯

我們來到的流處理章節的最後一節 了解一下流處理如何容錯

我們知道批處理很容易容錯 任何一個MapReduce出錯的話 可以簡單地在另一台機器上再次啟動 並放棄失敗的任務 簡單的理由是因為輸入文件不可變

對於批處理的輸出 我們可以很有信心的說 所有的輸入都被處理了剛好一次(exactly once)

在流處理中處理起來就沒有那麼直觀 等待某個任務完成之後再使其輸出可見並不是一個可行選項 因為你永遠無法處理完一個無限的流

微批次與存檔點

微批次

一個解決方案是將流分解成小塊 並像微型批處理一樣處理每個小塊 稱為微批次(microbatching) Spark Streaming就是這種用法 批次的長短大約是一秒 較小的批次會導致更大的調度與協調開銷 較大的批次意味著延遲更長

微批次需要一個滾動窗口 來知道哪些事件需要被一起處理

存檔點

但Apache Flink使用不同的方法 它會定期生成狀態的滾動存檔點並將其寫入持久存儲 如果運算的機器崩潰 新的機器從上個存檔點開始讀取 再繼續往下執行

原子提交

雖然大多時候 微批次和存檔點也跟批處理一樣達成了exactly once的要求 但如果我們的處理中包含了一些跟外部互動的副作用(寫入數據庫, 向外部消息代理發送消息, 發送電子郵件等等) 那如果在這些外部互動發生完後發現東西有錯 就無法挽回了 因為如果重新跑一次這些副作用也會再發生一次

為了在出現故障時表現出恰好處理一次的樣子 我們需要確保事件處理的所有輸出和副作用若且唯若處理成功時才會生效

我們希望這些副作用要馬都原子地發生 要馬都不發生

Idempotence

除了原子性之外 有另一種方法可以讓我們安全的重試 那就是依賴冪等性(idempotence)

冪等性指的是一個指令執行一次 跟執行多次的效果是一樣的 比如說

set X = 2 就是冪等的 因為執行一次跟多次都一樣 沒有副作用

Count++ 就不是冪等的 因為執行一次跟執行兩次結果會不一樣

即使一個操作原本不是冪等的 但我們可以用一些metadata紀錄資料 讓這個操作變成冪等 比如說在消費來自Kafka的消息時 每條消息都有一個持久的 單調遞增的偏移量 只要在寫到外部數據的時候 附上這個偏移量 你就可以判斷一條更新是不是已經執行過了 因而避免重複執行

僅僅需要一點點額外的開銷 就可以讓一個操作變成冪等 進而達成恰好一次的要求

失敗後重建狀態

任何需要維護狀態的流處理(計數器 平均 直方圖等等) 都必須確保在失敗之後能恢復其狀態

通常有兩個做法

1.將狀態保存在遠程數據存儲中 並進行複製: 你可以存在遠端數據庫裡 也可以存在本地內存

2.直接從輸入流中重建: 如果狀態是從相當短的窗口中聚合而成 那就簡單的replay該窗口中的輸入事件就可以

總結

我們在流處理這個大章節中 討論了

1.何謂事件流

2.事件流的目的

3.如何處理事件流

事件流其實非常像批處理 只是流是在無限的輸入上持續進行 從這個角度來看 消息代理和事件日誌可以視作文件系統的流式等價物

就流的來源而言 我們討論了幾種可能性: 用戶活動事件, 定期讀數的傳感器, Feed數據等等都可以自然地用流表示 我們發現將數據庫寫入視作流也是很有用的 我們可以捕獲changelog(不論是隱式的用變更數據捕獲還是顯式的通過事件溯源)

將數據庫表示為流為系統集成帶來了很多強大應用 通過消費變更日誌並將其應用至衍生系統 比如搜尋引擎 緩存等等

然後我們討論了流處理中對時間進行推理的困難 包含處理時間與事件時間戳之間的區別 以及各種窗口的定義

再來我們區分了流處理中可能出現的三種連接類型

1.流流連接: 兩個輸入流都由活動事件組成 Join operator會在某個窗口內搜索相關的事件 甚至某些時候這兩個輸入流是同一個(Self-Join)

2.流表連接: 可以想成一個輸入流由活動事件組成 另一個輸入流是數據庫變更日誌

3.表表連接: 可以想成兩個輸入流都是數據庫變更日誌

最後 我們討論了在流處理中實現容錯和恰好一次的技術 是由微批次, 存檔點, 原子性或冪等寫入達成