Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ENG] help! websocket write: broken pipe, and ws.cleanupConnection() Unlock() not work #291

Open
AndrewYEEE opened this issue Aug 31, 2024 · 3 comments

Comments

@AndrewYEEE
Copy link

OCPP version:
[x] 1.6
[ ] 2.0.1

I'm submitting a ...

[x] bug report
[ ] feature request

Current behavior:
ChargingPoint sent a large number of Heartbeats at the same time due to an exception,
and we encountered the following error:

time="2024-08-31T06:45:32+08:00" level=info msg=CCCCCC____Heartbeat
time="2024-08-31T06:45:32+08:00" level=info msg=BBBBBB____Heartbeat
time="2024-08-31T06:45:33+08:00" level=info msg=AAAAAA____Heartbeat
time="2024-08-31T06:45:33+08:00" level=info msg=AAAAAA____Heartbeat
time="2024-08-31T06:45:33+08:00" level=info msg="write failed for AAAAAA: %!w(*net.OpError=&{write tcp 0xc0007a24b0 0xc0007a24e0 0xc000682a60})" logger=websocket
time="2024-08-31T06:45:33+08:00" level=error msg="write failed for AAAAAA: write tcp 172.18.0.xx:9001->172.18.0.xx:57006: write: broken pipe" logger=websocket
time="2024-08-31T06:45:33+08:00" level=info msg=AAAAAA____Heartbeat
time="2024-08-31T06:45:33+08:00" level=info msg=EEEEEE____Heartbeat

But the next step is not printed:
"closed connection to AAAAAA"

According to writePump() of ws/websocket.go,
When "write failed" occurs, the next step is to execute server.cleanupConnection(ws) to close the connection and print "closed connection to AAAAAA", but it does not.

We judge that the program is blocked in server.cleanup Connection(ws) and server.connMutex.Unlock() is not released.

As a result, all subsequent messages are blocked and unable to respond to ChargingPoint because they are waiting for server.connMutex.Unlock().

Can anyone give an answer?

Is it possible that a Race condition occurs in cleanupConnection(), or are close(ws.outQueue) and close(ws.closeC) in server.cleanupConnection(ws) blocked?

Please help me! thank you!

@AndrewYEEE AndrewYEEE changed the title [ENG] websocket write: broken pipe, and ws.cleanupConnection() Unlock() not work [ENG] help! websocket write: broken pipe, and ws.cleanupConnection() Unlock() not work Aug 31, 2024
@AndrewYEEE
Copy link
Author

AndrewYEEE commented Sep 1, 2024

Maybe I found problem

The problem is, using RLock() in a process that may require Lock() :

    func (server *Server) Write(webSocketId string, data []byte) error {
	    server.connMutex.RLock()
	    defer server.connMutex.RUnlock()  \\<=============
	    ws, ok := server.connections[webSocketId]
	    if !ok {
		    return fmt.Errorf("couldn't write to websocket. No socket with id %v is open", webSocketId)
	    }
	    log.Debugf("queuing data for websocket %s", webSocketId)
	    ws.outQueue <- data \\<=============
	    return nil
    }

We know that when several messages enter at the same time, several RLock() will be generated,
And if one of the messages is wrong, cleanupConnection() in writePump() will be triggered.

            case ping := <-ws.pingMessage:
	            _ = conn.SetWriteDeadline(time.Now().Add(server.timeoutConfig.WriteWait))
	            err := conn.WriteMessage(websocket.PongMessage, ping)
	            if err != nil {
		            server.error(fmt.Errorf("write failed for %s: %w", ws.ID(), err))
		            // Invoking cleanup, as socket was forcefully closed
		            server.cleanupConnection(ws) //<============= here
		            return
	            }

There is Lock() in cleanupConnection(), which will cause Lock() to be blocked by other RLock() and generate Deadlock.

          func (server *Server) cleanupConnection(ws *WebSocket) {
	          _ = ws.connection.Close()
	          server.connMutex.Lock() //<============== here
	          close(ws.outQueue)
	          close(ws.closeC)
	          delete(server.connections, ws.id)
	          server.connMutex.Unlock()
	          log.Infof("closed connection to %s", ws.ID())
	          if server.disconnectedHandler != nil {
		          server.disconnectedHandler(ws)
	          }
          }

So, we need to modify the RUnLock position so that it does not occur at the same time as Lock():

      func (server *Server) Write(webSocketId string, data []byte) error {
	      server.connMutex.RLock()
	      // defer server.connMutex.RUnlock() //<======== change here
	      ws, ok := server.connections[webSocketId] 
              server.connMutex.RUnlock()  //<======== to here
	      if !ok {
		      return fmt.Errorf("couldn't write to websocket. No socket with id %v is open", webSocketId)
	      }
	      log.Debugf("queuing data for websocket %s", webSocketId)
	      ws.outQueue <- data
	      return nil
      }

Not sure if it is correct, please help me confirm, thks.
@lorenzodonini

@andig
Copy link
Contributor

andig commented Sep 5, 2024

There is Lock() in cleanupConnection(), which will cause Lock() to be blocked by other RLock() and generate Deadlock.

This would only deadlock if the queue is blocked and/or not buffered.

@dwibudut
Copy link
Contributor

dwibudut commented Sep 5, 2024

Hi @AndrewYEEE

I also had the same problem, in my env. it might caused by by the client sent burst message in one time to the server and then the client is closed by network layer (like VPN disconnected or very slowly network/4G). After server get write timeout the call inside cleanupConnection method is blocked.

I just change the Rlock() to Lock() but still blocked.

I tried this method and it worked for a temporary solution:

func (server *Server) Write(webSocketId string, data []byte) error {
	server.connMutex.RLock()
	ws, ok := server.connections[webSocketId]
        server.connMutex.RUnlock()
	if !ok {
		return fmt.Errorf("couldn't write to websocket. No socket with id %v is open", webSocketId)
	}
	log.Debugf("queuing data for websocket %s", webSocketId)
	defer func() {
		if r := recover(); r != nil {
			log.Debugf("couldn't write to already closed websocket with id %v", webSocketId)
		}
	}()
	ws.outQueue <- data
	return nil
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants