Trang chủ > Phát triển di động > Nội dung chính

Làm thế nào để mô tả hình ảnh cơ chế kiểm soát áp lực và điều khiển dòng chả


Trước đâybầu cua, tôi đã được mời trả lời một câu hỏi trên Zhihu về cơ chế backpressure của RxJava. Hôm nay, tôi sẽ tóm tắt lại những gì đã chia sẻ, hy vọng điều này sẽ hữu ích cho nhiều người hơn. Backpressure là một khái niệm quan trọng trong việc xử lý luồng dữ liệu, đặc biệt khi dữ liệu được tạo ra với tốc độ cao hơn khả năng xử lý. Hiểu rõ về nó không chỉ giúp chúng ta tối ưu hóa hiệu suất mà còn tránh được các vấn đề như tràn bộ nhớ hoặc lỗi ứng dụng.

Khi một chuỗi tạo ra dữ liệu với tốc độ nhanh hơn khả năng xử lý của người tiêu dùngbacarat, RxJava cung cấp các chiến lược để giải quyết vấn đề này. Cơ chế Backpressure cho phép người phát hiện ra tốc độ sản xuất dữ liệu và điều chỉnh dựa trên khả năng tiếp nhận của người tiêu dùng. Điều này giúp tránh tình trạng quá tải bộ nhớ hoặc lỗi do không thể xử lý kịp thời.

https://github.com/ReactiveX/RxJava/wiki/Backpressure

miêu tả sinh động

Đầu tiênbacarat, xét về góc độ tổng thể, tựa đề của tài liệu trên, mặc dù được đặt tên là “Backpressure” (áp lực ngược), nhưng thực tế đang đề cập đến một chủ đề lớn hơn nhiều — đó là “Flow Control” (điều khiển dòng chảy). Backpressure chỉ là một phương án trong số các giải pháp củ

Trong RxJavabacarat, bạn có thể tạo ra một chuỗi gọi (call chain) bằng cách liên tục áp dụng nhiều Operator lên đối tượ Quá trình này cho phép dữ liệu di chuyển từ phía nguồn (upstream) đến phía người dùng (downstream). Tuy nhiên, khi tốc độ gửi dữ liệu từ nguồn nhanh hơn tốc độ xử lý của phía người dùng, thì việc kiểm soát dòng chảy (Flow Control) trở nên cần thiết để đảm bảo hiệu suất và tránh tình trạng quá tải.

Điều này giống như một bài toán toán học mà chúng ta từng làm ở trường tiểu học: có một hồ chứa nướcbắn cá săn thưởng, một bên có vòi cấp nước và một bên có vòi xả nước. Nếu lượng nước từ vòi cấp nước lớn hơn, sau một khoảng thời gian hồ sẽ đầy (và tràn ra ngoài). Điều đó xảy ra chính là do thiếu đi điều khiển dòng chảy (flow control). Trong thực tế, nếu không có hệ thống quản lý hiệu quả để cân bằng giữa việc nạp dữ liệu và giải phóng nó, mọi thứ có thể trở nên hỗn loạn. Tương tự như việc vòi nước cấp quá mạnh mà không có cách nào để kiểm soát, dẫn đến hậu quả là toàn bộ hệ thống bị quá tải và không còn khả năng xử lý thêm bất kỳ thông tin nào nữa. Chính sự thiếu hụt trong việc điều chỉnh này đã khiến mọi thứ rơi vào tình trạng mất kiểm soát.

Có những ý tưởng gì Có khoảng bốn loại:

  • (1) Backpressure (áp lực ngược).
  • (2) Throttling (giảm tốc độ).
  • (3) Xử lý gói.
  • (4) Khóa gọi stack (Callstack blocking).

Dưới đây sẽ giải thích chi tiết từng loại.

Hiện tạibacarat, hai phiên bản RxJava 1.x và 2.x đang tồn tại cùng nhau, trong đó phiên bản 2.x có nhiều thay đổi đáng kể về giao diện so với phiên bản 1.x, bao gồm cả khía cạ Tuy nhiên, những khái niệm liên quan đến cơ chế kiểm soát luồng (Flow Control) mà chúng ta sẽ đề cập ở đây đều có thể áp dụng được cho cả hai phiên bản này. Dù có sự khác biệt giữa các phiên bản, nhưng nền tảng cốt lõi của RxJava vẫn giữ nguyên, giúp người dùng dễ dàng chuyển đổi và tiếp tục sử dụng mà không gặp quá nhiều khó khăn.

Một số ý tưởng của Flow Control

Backpressure (áp lực ngược)

Backpressurebầu cua, còn được gọi là Reactive Pull, có nghĩa là phía dưới (downstream) cần bao nhiêu dữ liệu (được chỉ định cụ thể qua yêu cầu request), phía trên (upstream) sẽ cung cấp đúng lượng đó. Điều này khá giống với cơ chế kiểm soát lưu lượng trong TCP, nơi mà bên nhận sẽ dựa trên tình trạng của cửa sổ nhận (receive window) để điều chỉnh tốc độ nhận dữ liệu và sử dụng các gói ACK ngược chiều để kiểm soát tốc độ gửi của bên gửi. Tuy nhiên, khác biệt ở đây là trong trường hợp của backpressure, sự kiểm soát không chỉ dựa vào việc xác nhận nhận dữ liệu mà còn liên quan đến việc điều chỉnh trực tiếp số lượng dữ liệu mà người gửi có thể đẩy tới người nhận, giúp tối ưu hóa hiệu suất và tránh tình trạng quá tải. Đây thực sự là một cơ chế rất hữu ích trong các hệ thống phân tán hoặc stream processing, nơi mà việc quản lý tài nguyên trở nên cực kỳ quan trọng để đảm bảo hiệu quả và ổn định của toàn bộ hệ thống.

Loại giải pháp này chỉ hoạt động đối với những Observable được gọi là "cold Observable". Cold Observable đề cập đến nguồn phát dữ liệu có thể điều chỉnh tốc độ gửi. Ví dụ như việc truyền một tệp tin giữa hai máy tínhbacarat, tốc độ truyền có thể thay đổi từ nhanh đến rất chậm, thậm chí chỉ ở mức vài byte mỗi giây, nhưng miễn là thời gian đủ dài, quá trình truyền vẫn sẽ hoàn tất. Ngược lại, các trường hợp như phát trực tiếp âm thanh và video lại thuộc về "hot Observable", vì nếu tốc độ dữ liệu giảm xuống dưới một giá trị nhất định, toàn bộ chức năng sẽ không còn khả thi nữa.

Throttling (giảm tốc độ)

Throttle (điều tiết) thực chất là loại bỏ. Khi bạn không thể xử lý hết dữ liệubacarat, hãy chọn một phần để xử lý và vứt bỏ phần còn lại. Lấy ví dụ về phát trực tiếp âm thanh và video, khi bên dưới không thể theo kịp tốc độ xử lý, thì cần phải bỏ qua một số gói dữ liệu. Điều này giúp hệ thống duy trì sự ổn định thay vì bị quá tải bởi lượng thông tin quá lớn mà nó không thể xử lý kịp thời.

Còn việc xử lý dữ liệu nào và bỏ qua dữ liệu nàobầu cua, có các chiến lược khác nhau. Có ba chiến lược chính:

  • sample (cũng gọi là throttleLast)
  • throttleFirst
  • debounce (cũng gọi là throttleWithTimeout)

Giải thích chi tiết từng loại từ góc nhìn nhỏ hơn.

Lấy mẫubắn cá săn thưởng, hay còn gọi là sampling. Nếu so sánh với việc lấy mẫu âm thanh, một tần số 8kHz có nghĩa là mỗi 125 micro giây sẽ thu thập một giá trị. Việc lấy mẫu (sampling) có thể được cấu hình theo nhiều cách khác nhau, ví dụ như lấy mẫu một lần mỗi 100 miligiây. Tuy nhiên, trong khoảng thời gian 100 miligiây đó, có thể có rất nhiều giá trị từ nguồn cấp dữ liệu truyền đến, vậy nên chọn giá trị nào? Thông thường, giá trị được chọn là giá trị cuối cùng trong khoảng thời gian đó. Vì lý do này, phương pháp này cũng được gọi là throttleLast, tức là chỉ giữ lại giá trị cuối cùng trong một khoảng thời gian nhất định. Trong thực tế, điều này có thể được áp dụng trong nhiều trường hợp khác nhau, chẳng hạn như khi xử lý tín hiệu hoặc dữ liệu thời gian thực. Khi lượng dữ liệu lớn và cần phải lọc bớt để giảm tải hệ thống, việc sử dụng throttleLast sẽ giúp tối ưu hóa hiệu suất mà vẫn đảm bảo không bỏ sót các thông tin quan trọng.

sample

ThrottleFirst khá giống với samplebầu cua, ví dụ như nó cũng sẽ lấy một giá trị mỗi 100 miligiây, nhưng thay vì chọn ngẫu nhiên thì nó sẽ ưu tiên giá trị đầu tiên trong khoảng thời gian đó. Trong lập trình Android, ThrottleFirst có thể được sử dụng để ngăn chặn hiện tượng bắn sự kiện click liên tục (được gọi là "debounce"). Lý do là vì nó chỉ xử lý sự kiện click đầu tiên trong một khoảng thời gian xác định (tức là "lấy mẫu" giá trị đầu tiên), đồng thời bỏ qua các sự kiện click tiếp theo trong cùng khoảng thời gian đó. Điều này đặc biệt hữu ích khi bạn muốn tránh việc người dùng vô tình nhấn nút nhiều lần trong thời gian ngắn, dẫn đến các hành động không mong muốn hoặc làm giảm hiệu suất ứng dụng. Với sự giúp đỡ của ThrottleFirst, bạn có thể đảm bảo rằng chỉ những tương tác quan trọng nhất mới được thực thi trong một khung thời gian cụ thể, từ đó cải thiện trải nghiệm người dùng và độ ổn định của ứng dụng.

sample

Debouncebắn cá săn thưởng, còn được gọi là throttleWithTimeout, chính tên của nó đã cho thấy một ví dụ điển hình. Hãy tưởng tượng một chương trình mạng đang duy trì một kết nối TCP, liên tục gửi và nhận dữ liệu qua lại. Tuy nhiên, trong quá trình đó, sẽ có những khoảng thời gian mà không có bất kỳ dữ liệu nào cần được gửi hoặc nhận. Những khoảng thời gian này được gọi là khoảng thời gian nhàn rỗi (idle time). Khi khoảng thời gian nhàn rỗi này vượt quá giá trị giới hạn trước đó đã được đặt sẵn, thì coi như đã xảy ra hiện tượng "chờ lâu" (timeout), và ở thời điểm đó, có thể cần phải ngắt kết nối. Thực tế, nhiều chương trình mạng làm việc trên server cũng hoạt động theo cách tương tự. Sau khi gửi hoặc nhận một gói dữ liệu, chương trình sẽ bắt đầu một bộ hẹn giờ, chờ đợi một khoảng thời gian nhàn rỗi. Nếu trong khoảng thời gian đó, có thêm các hoạt động gửi hoặc nhận dữ liệu, bộ hẹn giờ sẽ được đặt lại để bắt đầu chờ một khoảng thời gian nhàn rỗi mới. Nhưng nếu bộ hẹn giờ hết thời gian chờ mà không có thêm hoạt động nào xảy ra, nghĩa là đã xảy ra timeout, lúc này kết nối có thể bị đóng. Hành vi của debounce khá giống với điều này, nó có thể giúp phát hiện các khoảng thời gian nhàn rỗi lớn giữa các sự kiện liên tiếp. Nói cách khác, debounce có thể giúp xác định những khoảng trống lớn giữa các sự kiện xảy ra liên tục.

sample

Xử lý gói

Việc đóng gói là quá trình tập hợp các gói nhỏ từ nguồn thành những gói lớn hơn và phân phát chúng xuống phía dưới. Điều này giúp giảm số lượng gói mà phía dưới cần xử lý. Trong RxJavabầu cua, có hai cơ chế tương tự được cung cấp: buffer và window. Buffer hoạt động như một kho chứa tạm thời, nơi các sự kiện hoặc dữ liệu được tích lũy trong khoảng thời gian hoặc số lượng cụ thể trước khi được gửi đi. Còn window lại chia dữ liệu thành các "cửa sổ" thời gian hoặc số lượng, mỗi cửa sổ sẽ đại diện cho một phần độc lập của luồng dữ liệu. Cả hai phương thức này đều giúp tối ưu hóa việc quản lý và xử lý dữ liệu trong chuỗi quan sát (observable).

sample

sample

Cả buffer và window đều có chức năng tương tự nhaubắn cá săn thưởng, chỉ khác biệt ở định dạng đầu ra: buffer sẽ gói các phần tử thành một danh sách (List), trong khi window lại gói chúng thành một đối tượ Điều này có nghĩa là khi sử dụng buffer, bạn sẽ nhận được một tập hợp các phần tử được đóng gói gọn gàng trong một danh sách duy nhất. Ngược lại, với window, mỗi gói sẽ được biểu diễn dưới dạng một chuỗi quan sát (Observable) riêng lẻ, cho phép bạn xử lý từng phần tử theo cách linh hoạt hơn trong luồng dữ liệu.

Khóa gọi stack (Callstack blocking)

Đây là một trường hợp đặc biệtbầu cua, làm tắc nghẽn toàn bộ chuỗi gọi (Callstack blocking). Nó được coi là một trường hợp đặc biệt bởi vì cách này chỉ khả thi khi toàn bộ chuỗi gọi (call chain) đều được thực hiện đồng bộ trên cùng một luồng. Điều này có nghĩa là không được phép có bất kỳ operator nào trong chuỗi khởi chạy một luồng mới. Trong thực tế sử dụng thông thường, điều này khá hiếm gặp, vì chúng ta thường dùng các phương thức như subscribeOn hoặc observeOn để chuyển đổi giữa các luồng khác nhau, và một số operator phức tạp hơn thậm chí còn tự động tạo ra các luồng mới để xử lý dữ liệu bên trong. Mặt khác, nếu thực sự xuất hiện một chuỗi gọi hoàn toàn đồng bộ, thì các phương pháp kiểm soát dòng chảy (flow control) khác mà bạn đã biết vẫn có thể áp dụng, nhưng cách chặn này lại đơn giản hơn nhiều và không cần thêm bất kỳ hỗ trợ nào từ bên ngoài.

gọi chặn (call stack blocking)

Làm thế nào để Observable hỗ trợ

Trong RxJava 1.xbacarat, có những Observable hỗ trợ Backpressure và cũng có những Observable không hỗ trợ. Tuy nhiên, các Observable không hỗ trợ Backpressure hoàn toàn có thể được chuyển đổi thành loại Observable hỗ trợ Backpressure thông qua một số toán tử (operators). Những toán tử này bao gồm: 1. **`onBackpressureBuffer()`**: Giúp lưu trữ dữ liệu trong một bộ đệm khi tốc độ phát sinh dữ liệu vượt quá khả năng xử lý. Điều này giúp tránh lỗi do Backpressure nhưng có thể dẫn đến việc tiêu tốn bộ nhớ nếu dữ liệu phát sinh quá nhanh. 2. **`onBackpressureLatest()`**: Toán tử này sẽ chỉ giữ lại giá trị mới nhất từ chuỗi dữ liệu khi Backpressure xảy ra. Điều này hữu ích khi bạn quan tâm đến giá trị mới nhất mà không cần giữ lại tất cả dữ liệu đã bỏ lỡ. 3. **`buffer()` và `window()`**: Hai toán tử này giúp phân chia luồng dữ liệu thành các nhóm nhỏ hoặc các cửa sổ thời gian, làm giảm áp lực và giúp kiểm soát dòng chảy của dữ liệu. 4. **`sample()`**: Cho phép chọn ra dữ liệu ở các khoảng thời gian cụ thể thay vì nhận mọi sự kiện liên tục. Điều này giúp giảm tải và quản lý hiệu quả hơn. 5. **`throttleFirst()` và `throttleLast()`**: Hai toán tử này giúp giới hạn số lượng sự kiện được xử lý trong một khoảng thời gian nhất định. `throttleFirst()` giữ lại sự kiện đầu tiên trong mỗi khoảng thời gian, còn `throttleLast()` giữ lại sự kiện cuối cùng. Những toán tử trên cho phép bạn linh hoạt điều chỉnh dòng chảy dữ liệu để giải quyết vấn đề Backpressure một cách hiệu quả trong RxJava 1.x.

  • onBackpressureBuffer
  • onBackpressureDrop
  • onBackpressureLatest
  • onBackpressureBlock (đã ngừng sử dụng)

Chúng chuyển đổi thành các Observable có các chiến lược Backpressure khác nhau.

Trong RxJava 2.xbầu cua, Observable không còn hỗ trợ Backpressure nữa mà thay vào đó sử dụng Flowable để chuyên biệt hóa việc hỗ trợ Những nhà phát triển đã thiết kế bốn loại toán tử mà bạn vừa đề cập, trong đó ba loại đầu tiên lần lượt tương ứng với ba chiến lược Backpressure khác nhau của Flowable: 1. **On Backpressure Error**: Chiến lược này sẽ ném ra lỗi nếu dữ liệu được tạo ra nhanh hơn tốc độ tiêu thụ. 2. **On Backpressure Buffer**: Đây là cách tiếp cận an toàn hơn khi dữ liệu bị giữ lại trong một bộ đệm lớn cho đến khi người tiêu dùng có thể xử lý nó. 3. **On Backpressure Drop**: Chiến lược này đơn giản hóa vấn đề bằng cách bỏ qua các sự kiện dư thừa nếu người tiêu dùng không thể theo kịp. Các chiến lược này cung cấp nhiều tùy chọn linh hoạt để giải quyết vấn đề Backpressure và giúp lập trình viên kiểm soát hiệu quả luồng dữ liệu trong ứng dụng.

  • BackpressureStrategy.BUFFER
  • BackpressureStrategy.DROP
  • BackpressureStrategy.LATEST

onBackpressureBuffer là cách xử lý dữ liệu mà không làm mất bất kỳ thông tin nào. Nó sẽ lưu trữ toàn bộ dữ liệu nhận được từ nguồn vào một bộ đệm và chỉ gửi đi khi nguồn yêu cầu. Điều này giống như một con đập chứa nước. Tuy nhiênbacarat, nếu tốc độ của nguồn quá nhanh, bộ đệm (buffer) có thể bị quá tải và dẫn đến hiện tượng tràn dữ liệu. Trong thực tế, điều này có thể gây ra những thách thức lớn nếu không được kiểm soát cẩn thận. Một số giải pháp thường được áp dụng là tăng kích thước của bộ đệm hoặc điều chỉnh tốc độ xử lý của nguồn upstream để phù hợp hơn với khả năng tiếp nhận của nguồ Điều này giúp duy trì sự ổn định trong hệ thống và tránh tình trạng mất dữ liệu quan trọng.

sample

Cả `onBackpressureDrop` và `onBackpressureLatest` đều có điểm chung là bỏ qua dữ liệubầu cua, và chúng hoạt động như một cơ chế cấp phát token (hoặc cơ chế hạn ngạch). Dòng chảy dữ liệu giữa các giai đoạn của luồng sẽ dựa trên việc yêu cầu token từ phía người nhận (downstream) gửi lên cho nguồn cung cấp (upstream). Khi số lượng token giảm xuống 0, nguồn cung cấp sẽ bắt đầu bỏ qua dữ liệu. Tuy nhiên, hai chiến lược này có sự khác biệt tinh tế khi số lượng token đạt đến 0: với `onBackpressureDrop`, mọi dữ liệu bị loại bỏ hoàn toàn mà không lưu trữ bất kỳ thông tin nào; còn `onBackpressureLatest` lại giữ lại dữ liệu gần nhất, do đó khi có token mới được cấp phát, nó sẽ ưu tiên đẩy dữ liệu "mới nhất" đã được lưu vào dòng chảy trước tiên. Để hiểu rõ hơn về sự khác biệt này, bạn có thể tham khảo hình ảnh minh họa bên dưới để thấy cách mà mỗi chiến lược xử lý dữ liệu khi token về 0. --- Hình ảnh thứ nhất sẽ mô tả cách hoạt động của `onBackpressureDrop`: khi không còn token, dữ liệu bị bỏ qua ngay lập tức mà không có bất kỳ hành động lưu trữ nào. Hình ảnh thứ hai sẽ thể hiện cách hoạt động của `onBackpressureLatest`: dữ liệu sẽ được lưu tạm thời trong bộ nhớ và chỉ được truyền đi khi token mới xuất hiện, ưu tiên dữ liệu "mới nhất" mà nó đã giữ lại trước đó. Hy vọng những giải thích và hình ảnh minh họa này sẽ giúp bạn hiểu rõ hơn về sự khác biệt giữa hai chiến lược này!

sample

sample

Khi sử dụng onBackpressureBlockbầu cua, hệ thống sẽ kiểm tra xem phía hạ lưu có yêu cầu không. Nếu có yêu cầu, dữ liệu sẽ được truyền xuống phía hạ lưu. Tuy nhiên, nếu phía hạ lưu chưa sẵn sàng nhận thêm dữ liệu, thay vì loại bỏ dữ liệu đó, hệ thống sẽ cố gắng chặn dòng dữ liệu từ phía thượng nguồn (tuy nhiên, liệu điều này có hiệu quả hay không còn phụ thuộc vào cách xử lý của phía thượng nguồn). Quan trọng là bản thân phương thức này không lưu trữ dữ liệu tạm thời mà chỉ tập trung vào việc quản lý luồng dữ liệu theo thời gian thực. Chiến lược này đã bị bỏ qua không sử dụng nữa.


Bài viết này chủ yếu tập trung vào việc mô tả và so sánh các cơ chế điều khiển dòng dữ liệu (Flow Control) cũng như các phương pháp xử lý áp lực dữ liệu (Backpressure) trong RxJava dưới góc nhìn tổng quan. Nhiều chi tiết cụ thể đã không được đề cập đến. Ví dụbầu cua, ngoài khả năng gói dữ liệu nhận được trong một khoảng thời gian nhất định, cả buffer và window còn có thể đóng gói một số lượng cố định các dữ liệu khác nhau. Hơn nữa, cách mà onBackpressureDrop và onBackpressureLatest phản ứng khi nhận được nhiều yêu cầu dữ liệu từ luồng phía dưới trong một lần duy nhất cũng chưa được giải thích chi tiết trong bài viết này. Để hiểu rõ hơn về những khía cạnh này, bạn có thể tham khảo tài liệu tham chiếu API tương ứng để tìm câu trả lời. Tôi cũng rất mong nhận được sự góp ý và thảo luận từ bạn đọc qua phần bình luận dưới bài viết.

(Kết thúc)

Các bài viết được chọn lọc khác


Bài viết gốcbầu cua, xin vui lòng trích dẫn nguồn và bao gồm mã QR bên dưới! Nếu không, từ chối tái bản!
Liên kết bài viết: /law4cr92.html
Hãy theo dõi tài khoản Weibo cá nhân của tôi: Tìm kiếm tên tôi "Trương Thiết Lệ" trên Weibo.
Tài khoản WeChat của tôi: tielei-blog (Trương Thiết Lệ)
Bài trước: Giao tiếp hiệu quả như đòn đánh cuối cùng của Ninja!
Bài sau: Ba cấp độ của kiến thức

Bài viết mới nhất