Мільйон WebSockets і вперед

Привіт всім! Мене звати Сергій Камардін, я розробник Mail.Ru.

Ця стаття розповідає про те, як ми розробили високонавантажений сервер WebSocket за допомогою Go.

Якщо ви знайомі з WebSockets, але мало що знаєте про Go, сподіваюся, ця стаття все-таки виявиться цікавою з точки зору ідей та методів оптимізації продуктивності.

1. Вступ

Щоб визначити контекст нашої історії, слід сказати кілька слів про те, для чого нам потрібен цей сервер.

Mail.Ru має безліч систем з підтримкою стану. Зберігання електронної пошти користувача - одне з них. Є кілька способів відстежувати зміни стану в системі - і про події в системі. Здебільшого це відбувається шляхом періодичного опитування системи або системних повідомлень про зміни його стану.

Обидва способи мають свої плюси і мінуси. Але коли справа стосується пошти, чим швидше користувач отримує нову пошту, тим краще.

Опитування пошти включає близько 50 000 HTTP-запитів на секунду, 60% з яких повертають статус 304, тобто в поштовій скриньці немає змін.

Тому, щоб зменшити навантаження на сервери та пришвидшити доставку пошти користувачам, було прийнято рішення перевинайти колесо, написавши сервер видавець-передплатник (також відомий як шина, посередник повідомлень або канал), який отримував би сповіщення про зміни стану, з одного боку, та підписки на такі сповіщення, з іншого.

Раніше:

Зараз:

Перша схема показує, як це було раніше. Браузер періодично опитував API і запитував про зміни в сховищі (послуга поштових скриньок).

Друга схема описує нову архітектуру. Браузер встановлює з’єднання WebSocket з API сповіщень, який є клієнтом сервера Bus. Отримавши новий електронний лист, Storage надсилає повідомлення про це Bus (1), а Bus своїм абонентам (2). API визначає підключення для надсилання отриманого сповіщення та надсилає його в браузер користувача (3).

Отже, сьогодні ми поговоримо про API або сервер WebSocket. Забігаючи наперед, я скажу вам, що сервер матиме близько 3 мільйонів інтернет-з'єднань.

2. Ідіоматичний спосіб

Давайте подивимося, як ми реалізуємо певні частини нашого сервера за допомогою простих функцій Go без будь-яких оптимізацій.

Перш ніж продовжити net/http, давайте поговоримо про те, як ми будемо надсилати та отримувати дані. Дані, що стоять над протоколом WebSocket (наприклад, об'єкти JSON), надалі називатимуться пакетами .

Почнемо реалізовувати Channelструктуру, яка міститиме логіку надсилання та отримання таких пакетів через з'єднання WebSocket.

2.1. Структура каналу

// Packet represents application level data. type Packet struct { ... } // Channel wraps user connection. type Channel struct { conn net.Conn // WebSocket connection. send chan Packet // Outgoing packets queue. } func NewChannel(conn net.Conn) *Channel { c := &Channel{ conn: conn, send: make(chan Packet, N), } go c.reader() go c.writer() return c }

Я хотів би звернути вашу увагу на запуск двох горутин для читання та письма. Кожна програма вимагає власного стека пам'яті, який може мати початковий розмір від 2 до 8 КБ залежно від операційної системи та версії Go.

Щодо вищезазначеної кількості 3 мільйонів інтернет-з’єднань, нам знадобиться 24 ГБ пам’яті (зі стеком 4 КБ) для всіх з’єднань. І це без пам'яті, виділеної для Channelструктури, вихідних пакетів ch.sendта інших внутрішніх полів.

2.2. Програми введення / виводу

Давайте подивимось на реалізацію “читача”:

func (c *Channel) reader() { // We make a buffered read to reduce read syscalls. buf := bufio.NewReader(c.conn) for { pkt, _ := readPacket(buf) c.handle(pkt) } }

Тут ми використовуємо, bufio.Readerщоб зменшити кількість read()системних викликів і прочитати стільки, скільки дозволяє bufрозмір буфера. В межах нескінченного циклу ми очікуємо надходження нових даних. Будь ласка, пам’ятайте слова: очікуйте надходження нових даних. До них ми повернемось пізніше.

Ми залишимо осторонь синтаксичний розбір та обробку вхідних пакетів, оскільки це не важливо для оптимізацій, про які ми поговоримо. Однак, bufзараз варто нашої уваги: ​​за замовчуванням це 4 КБ, що означає ще 12 ГБ пам'яті для наших з’єднань. Подібна ситуація і з "письменником":

func (c *Channel) writer() { // We make buffered write to reduce write syscalls. buf := bufio.NewWriter(c.conn) for pkt := range c.send { _ := writePacket(buf, pkt) buf.Flush() } }

Ми перебираємо канал вихідних пакетів c.sendі записуємо їх у буфер. Це, як вже можуть здогадатися наші уважні читачі, ще 4 КБ і 12 ГБ пам’яті для наших 3 мільйонів підключень.

2.3. HTTP

У нас вже є проста Channelреалізація, тепер нам потрібно підключити WebSocket для роботи. Оскільки ми все ще перебуваємо під заголовком Ідіоматичний шлях , давайте зробимо це відповідним чином.

Примітка: Якщо ви не знаєте, як працює WebSocket, слід зазначити, що клієнт переходить на протокол WebSocket за допомогою спеціального механізму HTTP, який називається Upgrade. Після успішної обробки запиту на оновлення сервер і клієнт використовують TCP-з'єднання для обміну двійковими кадрами WebSocket. Ось опис каркасної структури всередині з'єднання.
import ( "net/http" "some/websocket" ) http.HandleFunc("/v1/ws", func(w http.ResponseWriter, r *http.Request) { conn, _ := websocket.Upgrade(r, w) ch := NewChannel(conn) //... })

Будь ласка , зверніть увагу , що http.ResponseWriterробить виділення пам'яті для bufio.Readerі bufio.Writer(як з 4 КБ буфера) для *http.Requestініціалізації і подальшого написання відповіді.

Незалежно від використовуваної бібліотеки WebSocket, після успішної відповіді на запит на оновлення, сервер отримує буфери вводу-виводу разом із з'єднанням TCP після responseWriter.Hijack()дзвінка.

Підказка: в деяких випадках за допомогою виклику go:linknameможна повернути буфери sync.Poolвсередину .net/httpnet/http.putBufio{Reader,Writer}

Таким чином, нам потрібно ще 24 ГБ пам’яті для 3 мільйонів підключень.

Отже, загалом 72 ГБ пам’яті для програми, яка поки нічого не робить!

3. Оптимізації

Давайте переглянемо те, про що ми говорили у вступній частині, і згадаємо, як поводиться підключення користувача. Після переходу на WebSocket клієнт відправляє пакет із відповідними подіями або іншими словами підписується на події. Тоді (не беручи до уваги технічні повідомлення, такі як ping/pong), клієнт не може надсилати нічого іншого протягом усього терміну служби з'єднання.

Термін служби з’єднання може тривати від кількох секунд до декількох днів.

Отже, більшість часу ми Channel.reader()і Channel.writer()чекаємо обробки даних для отримання або надсилання. Разом з ними очікують буфери вводу-виводу по 4 КБ кожен.

Тепер зрозуміло, що певні речі можна зробити краще, чи не так?

3.1. Нетполь

Ви пам'ятаєте Channel.reader()реалізацію, яка передбачала отримання нових даних , заблокувавши conn.Read()виклик усередині bufio.Reader.Read()? Якщо в підключенні були дані, Go Runtime "розбудив" нашу програму і дозволив їй прочитати наступний пакет. Після цього програма знову заблокувалася, очікуючи нових даних. Давайте подивимося, як Go Runtime розуміє, що програму потрібно «прокинути».

Якщо ми подивимося на реалізацію conn.Read (), то побачимо всередині неї виклик net.netFD.Read ():

// net/fd_unix.go func (fd *netFD) Read(p []byte) (n int, err error) { //... for { n, err = syscall.Read(fd.sysfd, p) if err != nil { n = 0 if err == syscall.EAGAIN { if err = fd.pd.waitRead(); err == nil { continue } } } //... break } //... }
Go uses sockets in non-blocking mode. EAGAIN says there is no data in the socket and not to get locked on reading from the empty socket, OS returns control to us.

We see a read() syscall from the connection file descriptor. If read returns the EAGAIN error, runtime makes the pollDesc.waitRead() call:

// net/fd_poll_runtime.go func (pd *pollDesc) waitRead() error { return pd.wait('r') } func (pd *pollDesc) wait(mode int) error { res := runtime_pollWait(pd.runtimeCtx, mode) //... }

If we dig deeper, we’ll see that netpoll is implemented using epoll in Linux and kqueue in BSD. Why not use the same approach for our connections? We could allocate a read buffer and start the reading goroutine only when it is really necessary: when there is really readable data in the socket.

On github.com/golang/go, there is the issue of exporting netpoll functions.

3.2. Getting rid of goroutines

Suppose we have netpoll implementation for Go. Now we can avoid starting the Channel.reader() goroutine with the inside buffer, and subscribe for the event of readable data in the connection:

ch := NewChannel(conn) // Make conn to be observed by netpoll instance. poller.Start(conn, netpoll.EventRead, func() { // We spawn goroutine here to prevent poller wait loop // to become locked during receiving packet from ch. go Receive(ch) }) // Receive reads a packet from conn and handles it somehow. func (ch *Channel) Receive() { buf := bufio.NewReader(ch.conn) pkt := readPacket(buf) c.handle(pkt) }

It is easier with the Channel.writer() because we can run the goroutine and allocate the buffer only when we are going to send the packet:

func (ch *Channel) Send(p Packet) { if c.noWriterYet() { go ch.writer() } ch.send <- p }
Note that we do not handle cases when operating system returns EAGAIN on write() system calls. We lean on Go runtime for such cases, cause it is actually rare for such kind of servers. Nevertheless, it could be handled in the same way if needed.

After reading the outgoing packets from ch.send (one or several), the writer will finish its operation and free the goroutine stack and the send buffer.

Perfect! We have saved 48 GB by getting rid of the stack and I/O buffers inside of two continuously running goroutines.

3.3. Control of resources

A great number of connections involves not only high memory consumption. When developing the server, we experienced repeated race conditions and deadlocks often followed by the so-called self-DDoS — a situation when the application clients rampantly tried to connect to the server thus breaking it even more.

For example, if for some reason we suddenly could not handle ping/pong messages, but the handler of idle connections continued to close such connections (supposing that the connections were broken and therefore provided no data), the client appeared to lose connection every N seconds and tried to connect again instead of waiting for events.

It would be great if the locked or overloaded server just stopped accepting new connections, and the balancer before it (for example, nginx) passed request to the next server instance.

Moreover, regardless of the server load, if all clients suddenly want to send us a packet for any reason (presumably by cause of bug), the previously saved 48 GB will be of use again, as we will actually get back to the initial state of the goroutine and the buffer per each connection.

Goroutine pool

We can restrict the number of packets handled simultaneously using a goroutine pool. This is what a naive implementation of such pool looks like:

package gopool func New(size int) *Pool { return &Pool{ work: make(chan func()), sem: make(chan struct{}, size), } } func (p *Pool) Schedule(task func()) error { select { case p.work <- task: case p.sem <- struct{}{}: go p.worker(task) } } func (p *Pool) worker(task func()) { defer func() { <-p.sem } for { task() task = <-p.work } }

Now our code with netpoll looks as follows:

pool := gopool.New(128) poller.Start(conn, netpoll.EventRead, func() { // We will block poller wait loop when // all pool workers are busy. pool.Schedule(func() { Receive(ch) }) })

So now we read the packet not only upon readable data appearance in the socket, but also upon the first opportunity to take up the free goroutine in the pool.

Similarly, we’ll change Send():

pool := gopool.New(128) func (ch *Channel) Send(p Packet) { if c.noWriterYet() { pool.Schedule(ch.writer) } ch.send <- p }

Instead of go ch.writer(), we want to write in one of the reused goroutines. Thus, for a pool of N goroutines, we can guarantee that with N requests handled simultaneously and the arrived N + 1 we will not allocate a N + 1 buffer for reading. The goroutine pool also allows us to limit Accept() and Upgrade() of new connections and to avoid most situations with DDoS.

3.4. Zero-copy upgrade

Let’s deviate a little from the WebSocket protocol. As was already mentioned, the client switches to the WebSocket protocol using a HTTP Upgrade request. This is what it looks like:

GET /ws HTTP/1.1 Host: mail.ru Connection: Upgrade Sec-Websocket-Key: A3xNe7sEB9HixkmBhVrYaA== Sec-Websocket-Version: 13 Upgrade: websocket HTTP/1.1 101 Switching Protocols Connection: Upgrade Sec-Websocket-Accept: ksu0wXWG+YmkVx+KQR2agP0cQn4= Upgrade: websocket

That is, in our case we need the HTTP request and its headers only for switch to the WebSocket protocol. This knowledge and what is stored inside the http.Request suggests that for the sake of optimization, we could probably refuse unnecessary allocations and copyings when processing HTTP requests and abandon the standard net/http server.

For example, the http.Request contains a field with the same-name Header type that is unconditionally filled with all request headers by copying data from the connection to the values strings. Imagine how much extra data could be kept inside this field, for example for a large-size Cookie header.

But what to take in return?

WebSocket implementation

Unfortunately, all libraries existing at the time of our server optimization allowed us to do upgrade only for the standard net/http server. Moreover, neither of the (two) libraries made it possible to use all the above read and write optimizations. For these optimizations to work, we must have a rather low-level API for working with WebSocket. To reuse the buffers, we need the procotol functions to look like this:

func ReadFrame(io.Reader) (Frame, error) func WriteFrame(io.Writer, Frame) error

If we had a library with such API, we could read packets from the connection as follows (the packet writing would look the same):

// getReadBuf, putReadBuf are intended to // reuse *bufio.Reader (with sync.Pool for example). func getReadBuf(io.Reader) *bufio.Reader func putReadBuf(*bufio.Reader) // readPacket must be called when data could be read from conn. func readPacket(conn io.Reader) error { buf := getReadBuf() defer putReadBuf(buf) buf.Reset(conn) frame, _ := ReadFrame(buf) parsePacket(frame.Payload) //... }

In short, it was time to make our own library.

github.com/gobwas/ws

Ideologically, the ws library was written so as not to impose its protocol operation logic on users. All reading and writing methods accept standard io.Reader and io.Writer interfaces, which makes it possible to use or not to use buffering or any other I/O wrappers.

Besides upgrade requests from standard net/http, ws supports zero-copy upgrade, the handling of upgrade requests and switching to WebSocket without memory allocations or copyings. ws.Upgrade() accepts io.ReadWriter (net.Conn implements this interface). In other words, we could use the standard net.Listen() and transfer the received connection from ln.Accept() immediately to ws.Upgrade(). The library makes it possible to copy any request data for future use in the application (for example, Cookie to verify the session).

Below there are benchmarks of Upgrade request processing: standard net/http server versus net.Listen() with zero-copy upgrade:

BenchmarkUpgradeHTTP 5156 ns/op 8576 B/op 9 allocs/op BenchmarkUpgradeTCP 973 ns/op 0 B/op 0 allocs/op

Switching to ws and zero-copy upgrade saved us another 24 GB — the space allocated for I/O buffers upon request processing by the net/http handler.

3.5. Summary

Let’s structure the optimizations I told you about.

  • A read goroutine with a buffer inside is expensive. Solution: netpoll (epoll, kqueue); reuse the buffers.
  • A write goroutine with a buffer inside is expensive. Solution: start the goroutine when necessary; reuse the buffers.
  • With a storm of connections, netpoll won’t work. Solution: reuse the goroutines with the limit on their number.
  • net/http is not the fastest way to handle Upgrade to WebSocket. Solution: use the zero-copy upgrade on bare TCP connection.

That is what the server code could look like:

import ( "net" "github.com/gobwas/ws" ) ln, _ := net.Listen("tcp", ":8080") for { // Try to accept incoming connection inside free pool worker. // If there no free workers for 1ms, do not accept anything and try later. // This will help us to prevent many self-ddos or out of resource limit cases. err := pool.ScheduleTimeout(time.Millisecond, func() { conn := ln.Accept() _ = ws.Upgrade(conn) // Wrap WebSocket connection with our Channel struct. // This will help us to handle/send our app's packets. ch := NewChannel(conn) // Wait for incoming bytes from connection. poller.Start(conn, netpoll.EventRead, func() { // Do not cross the resource limits. pool.Schedule(func() { // Read and handle incoming packet(s). ch.Recevie() }) }) }) if err != nil { time.Sleep(time.Millisecond) } }

4. Conclusion

Premature optimization is the root of all evil (or at least most of it) in programming. Donald Knuth

Of course, the above optimizations are relevant, but not in all cases. For example if the ratio between free resources (memory, CPU) and the number of online connections is rather high, there is probably no sense in optimizing. However, you can benefit a lot from knowing where and what to improve.

Thank you for your attention!

5. References

  • //github.com/mailru/easygo
  • //github.com/gobwas/ws
  • //github.com/gobwas/ws-examples
  • //github.com/gobwas/httphead
  • Russian version of this article