码迷,mamicode.com
首页 > Web开发 > 详细

WebFlux之Websocket

时间:2021-02-27 13:25:19      阅读:0      评论:0      收藏:0      [点我收藏+]

标签:return   inf   load data   require   remote   bin   sas   pool   har   

协议

参考 https://www.cnblogs.com/nuccch/p/10947256.html

WebSocket是基于TCP的应用层协议,用于在C/S架构的应用中实现双向通信,关于WebSocket协议的详细规范和定义参见rfc6455
需要特别注意的是:虽然WebSocket协议在建立连接时会使用HTTP协议,但这并意味着WebSocket协议是基于HTTP协议实现的

技术图片

WebSocket与Http的区别

技术图片

websocket应用层的连接协议有两种

  • Handshake
  • Message

连接(Handshake)

技术图片

标准的websocket连接请求头需要包含

GET /xxx HTTP/1.1
# 主机。
Host: server.example.com
# 协议升级。
Upgrade: websocket
# 连接状态。
Connection: Upgrade
# websocket客户端生成的随机字符。
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
# websocket协议的版本是13。
Sec-WebSocket-Version: 13

数据帧(Message)

WebSocket协议格式:

 0                   1                   2                   3
 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-------+-+-------------+-------------------------------+
|F|R|R|R| opcode|M| Payload len |    Extended payload length    |
|I|S|S|S|  (4)  |A|     (7)     |             (16/64)           |
|N|V|V|V|       |S|             |   (if payload len==126/127)   |
| |1|2|3|       |K|             |                               |
+-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
|     Extended payload length continued, if payload len == 127  |
+ - - - - - - - - - - - - - - - +-------------------------------+
|                               |Masking-key, if MASK set to 1  |
+-------------------------------+-------------------------------+
| Masking-key (continued)       |          Payload Data         |
+-------------------------------- - - - - - - - - - - - - - - - +
:                     Payload Data continued ...                :
+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
|                     Payload Data continued ...                |
+---------------------------------------------------------------+

webflux集成

  1. 继承WebSocketHandler

    EchoWebSocketHandler继承WebSocketHandler

    public class EchoWebSocketHandler implements WebSocketHandler {
    
        public EchoWebSocketHandler() {
        }
    
        @Override
        public Mono<Void> handle(WebSocketSession session) {
            // Use retain() for Reactor Netty
            return session.send(session.receive().doOnNext(WebSocketMessage::retain));
        }
    }
    
  2. 注册HandlerMapping

    注册了 /echo 到 EchoWebSocketHandler 的映射

        @Bean
        public HandlerMapping handlerMapping() {
            Map<String, WebSocketHandler> map = new HashMap<>();
            map.put("/echo", new EchoWebSocketHandler());
    
            SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
            mapping.setUrlMap(map);
            return mapping;
        }
    
  3. 注入WebSocketHandlerAdapter

        @Bean
        WebSocketHandlerAdapter webSocketHandlerAdapter(){
            return new WebSocketHandlerAdapter();
        }
    

原理解析

1.DispatcherHandler

注入了SimpleUrlHandlerMapping和WebSocketHandlerAdapter,看下如何处理来自浏览器的请求

	public Mono<Void> handle(ServerWebExchange exchange) {
		if (this.handlerMappings == null) {
			return createNotFoundError();
		}
		return Flux.fromIterable(this.handlerMappings)
				.concatMap(mapping -> mapping.getHandler(exchange))
				.next()
				.switchIfEmpty(createNotFoundError())
				.flatMap(handler -> invokeHandler(exchange, handler))
				.flatMap(result -> handleResult(exchange, result));
	}
  • mapping.getHandler(exchange)返回EchoWebSocketHandler
  • invokeHandler调用WebSocketHandlerAdapter处理EchoWebSocketHandler

2.WebSocketHandlerAdapter

supports方法判断EchoWebSocketHandler的返回结果为true

handle方法调用WebSocketService的handleRequest方法

	@Override
	public boolean supports(Object handler) {
		return WebSocketHandler.class.isAssignableFrom(handler.getClass());
	}

	@Override
	public Mono<HandlerResult> handle(ServerWebExchange exchange, Object handler) {
		WebSocketHandler webSocketHandler = (WebSocketHandler) handler;
		return getWebSocketService().handleRequest(exchange, webSocketHandler).then(Mono.empty());
	}

3.WebSocketService

实现类为HandshakeWebSocketService

验证HTTP请求头

  • GET 方法
  • Upgrade: websocket
  • Connection: Upgrade
  • Sec-Websocket-Key: S7SmUANSlEG47sjuY9C2sg==
  • Sec-Websocket-Protocol: (可选)

RequestUpgradeStrategy处理websocket请求升级,即将HTTP协议转为Websocket协议,后面的数据都通过WebSocket数据帧进行通信

	public Mono<Void> handleRequest(ServerWebExchange exchange, WebSocketHandler handler) {
		ServerHttpRequest request = exchange.getRequest();
		HttpMethod method = request.getMethod();
		HttpHeaders headers = request.getHeaders();

		if (HttpMethod.GET != method) {
			return Mono.error(new MethodNotAllowedException(
					request.getMethodValue(), Collections.singleton(HttpMethod.GET)));
		}

		if (!"WebSocket".equalsIgnoreCase(headers.getUpgrade())) {
			return handleBadRequest(exchange, "Invalid ‘Upgrade‘ header: " + headers);
		}

		List<String> connectionValue = headers.getConnection();
		if (!connectionValue.contains("Upgrade") && !connectionValue.contains("upgrade")) {
			return handleBadRequest(exchange, "Invalid ‘Connection‘ header: " + headers);
		}

		String key = headers.getFirst(SEC_WEBSOCKET_KEY);
		if (key == null) {
			return handleBadRequest(exchange, "Missing \"Sec-WebSocket-Key\" header");
		}

		String protocol = selectProtocol(headers, handler);

		return initAttributes(exchange).flatMap(attributes ->
				this.upgradeStrategy.upgrade(exchange, handler, protocol,
						() -> createHandshakeInfo(exchange, request, protocol, attributes))
		);
	}

4.RequestUpgradeStrategy

Reactor Netty的实现类ReactorNettyRequestUpgradeStrategy

调用HttpServerOperations.sendWebsocket进一步处理

	public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler handler,
			@Nullable String subProtocol, Supplier<HandshakeInfo> handshakeInfoFactory) {

		ServerHttpResponse response = exchange.getResponse();
		HttpServerResponse reactorResponse = ((AbstractServerHttpResponse) response).getNativeResponse();
		HandshakeInfo handshakeInfo = handshakeInfoFactory.get();
		NettyDataBufferFactory bufferFactory = (NettyDataBufferFactory) response.bufferFactory();

		return reactorResponse.sendWebsocket(subProtocol, this.maxFramePayloadLength,
				(in, out) -> {
					ReactorNettyWebSocketSession session =
							new ReactorNettyWebSocketSession(
									in, out, handshakeInfo, bufferFactory, this.maxFramePayloadLength);
					return handler.handle(session);
				});
	}

握手信息HandshakeInfo

	private HandshakeInfo createHandshakeInfo(ServerWebExchange exchange, ServerHttpRequest request,
			@Nullable String protocol, Map<String, Object> attributes) {

		URI uri = request.getURI();
		// Copy request headers, as they might be pooled and recycled by
		// the server implementation once the handshake HTTP exchange is done.
		HttpHeaders headers = new HttpHeaders();
		headers.addAll(request.getHeaders());
		Mono<Principal> principal = exchange.getPrincipal();
		String logPrefix = exchange.getLogPrefix();
		InetSocketAddress remoteAddress = request.getRemoteAddress();
		return new HandshakeInfo(uri, headers, principal, protocol, remoteAddress, attributes, logPrefix);
	}

技术图片

5.HttpServerOperations

  • WebSocketServerOperations负责建立握手协议
  • rebind 设置channel的ReactorNetty.CONNECTION属性为WebsocketServerOperations
  • 成功后WebsocketHandler绑定WebsocketServerOperations
    public Mono<Void> sendWebsocket(@Nullable String protocols, int maxFramePayloadLength, BiFunction<? super WebsocketInbound, ? super WebsocketOutbound, ? extends Publisher<Void>> websocketHandler) {
        return this.withWebsocketSupport(this.uri(), protocols, maxFramePayloadLength, websocketHandler);
    }
    
        final Mono<Void> withWebsocketSupport(String url, @Nullable String protocols, int maxFramePayloadLength, BiFunction<? super WebsocketInbound, ? super WebsocketOutbound, ? extends Publisher<Void>> websocketHandler) {
        Objects.requireNonNull(websocketHandler, "websocketHandler");
        if (this.markSentHeaders()) {
            WebsocketServerOperations ops = new WebsocketServerOperations(url, protocols, maxFramePayloadLength, this);
            if (this.rebind(ops)) {
                return FutureMono.from(ops.handshakerResult).doOnEach((signal) -> {
                    if (!signal.hasError() && (protocols == null || ops.selectedSubprotocol() != null)) {
                        ((Publisher)websocketHandler.apply(ops, ops)).subscribe(new HttpServerOperations.WebsocketSubscriber(ops, signal.getContext()));
                    }

                });
            }
        } else {
            log.error(ReactorNetty.format(this.channel(), "Cannot enable websocket if headers have already been sent"));
        }

        return Mono.error(new IllegalStateException("Failed to upgrade to websocket"));
    }

握手协议的建立

  • 创建WebSocketServerHandshakerFactory

  • 创建WebSocketServerHandshaker = WebSocketServerHandshakerFactory.newHandshaker

    这里会根据sec-websocket-protocol加载相应的协议

      public WebSocketServerHandshaker newHandshaker(HttpRequest req) {
            CharSequence version = req.headers().get(HttpHeaderNames.SEC_WEBSOCKET_VERSION);
            if (version != null) {
                if (version.equals(WebSocketVersion.V13.toHttpHeaderValue())) {
                    return new WebSocketServerHandshaker13(this.webSocketURL, this.subprotocols, this.allowExtensions, this.maxFramePayloadLength, this.allowMaskMismatch);
                } else if (version.equals(WebSocketVersion.V08.toHttpHeaderValue())) {
                    return new WebSocketServerHandshaker08(this.webSocketURL, this.subprotocols, this.allowExtensions, this.maxFramePayloadLength, this.allowMaskMismatch);
                } else {
                    return version.equals(WebSocketVersion.V07.toHttpHeaderValue()) ? new WebSocketServerHandshaker07(this.webSocketURL, this.subprotocols, this.allowExtensions, this.maxFramePayloadLength, this.allowMaskMismatch) : null;
                }
            } else {
                return new WebSocketServerHandshaker00(this.webSocketURL, this.subprotocols, this.maxFramePayloadLength);
            }
        }
    
  • 移除reactor.left.httpTrafficHandler

  • WebSocketServerHandshaker.handshake实现握手

    WebsocketServerOperations(String wsUrl, @Nullable String protocols, int maxFramePayloadLength, HttpServerOperations replaced) {
        super(replaced);
        Channel channel = replaced.channel();
        WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(wsUrl, protocols, true, maxFramePayloadLength);
        this.handshaker = wsFactory.newHandshaker(replaced.nettyRequest);
        if (this.handshaker == null) {
            WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(channel);
            this.handshakerResult = null;
        } else {
            this.removeHandler("reactor.left.httpTrafficHandler");
            this.handshakerResult = channel.newPromise();
            HttpRequest request = new DefaultFullHttpRequest(replaced.version(), replaced.method(), replaced.uri());
            request.headers().set(replaced.nettyRequest.headers());
            if (this.channel().pipeline().get("reactor.left.compressionHandler") != null) {
                this.removeHandler("reactor.left.compressionHandler");
                WebSocketServerCompressionHandler wsServerCompressionHandler = new WebSocketServerCompressionHandler();

                try {
                    wsServerCompressionHandler.channelRead(channel.pipeline().context("reactor.right.reactiveBridge"), request);
                    this.addHandlerFirst("reactor.left.wsCompressionHandler", wsServerCompressionHandler);
                } catch (Throwable var10) {
                    log.error(ReactorNetty.format(this.channel(), ""), var10);
                }
            }

            this.handshaker.handshake(channel, request, replaced.responseHeaders.remove(HttpHeaderNames.TRANSFER_ENCODING), this.handshakerResult).addListener((f) -> {
                this.markPersistent(false);
            });
        }

    }

6.WebSocketServerHandshaker

实现类有WebSocketServerHandshaker13、WebSocketServerHandshaker08、WebSocketServerHandshaker07、WebSocketServerHandshaker00

  • 新建HandshakeResponse

    主要设置了三个Header

    技术图片

  • 移除HttpObjectAggregator pipeline

  • 移除HttpContentCompressor pipeline

  • 在HttpServerCodec pipeline之前添加WebsocketDecoder和WebsocketEncoder

  • 写HandshakeResponse到输出流

    public final ChannelFuture handshake(Channel channel, FullHttpRequest req, HttpHeaders responseHeaders, final ChannelPromise promise) {
        if (logger.isDebugEnabled()) {
            logger.debug("{} WebSocket version {} server handshake", channel, this.version());
        }

        FullHttpResponse response = this.newHandshakeResponse(req, responseHeaders);
        ChannelPipeline p = channel.pipeline();
        if (p.get(HttpObjectAggregator.class) != null) {
            p.remove(HttpObjectAggregator.class);
        }

        if (p.get(HttpContentCompressor.class) != null) {
            p.remove(HttpContentCompressor.class);
        }

        ChannelHandlerContext ctx = p.context(HttpRequestDecoder.class);
        final String encoderName;
        if (ctx == null) {
            ctx = p.context(HttpServerCodec.class);
            if (ctx == null) {
                promise.setFailure(new IllegalStateException("No HttpDecoder and no HttpServerCodec in the pipeline"));
                return promise;
            }

            p.addBefore(ctx.name(), "wsdecoder", this.newWebsocketDecoder());
            p.addBefore(ctx.name(), "wsencoder", this.newWebSocketEncoder());
            encoderName = ctx.name();
        } else {
            p.replace(ctx.name(), "wsdecoder", this.newWebsocketDecoder());
            encoderName = p.context(HttpResponseEncoder.class).name();
            p.addBefore(encoderName, "wsencoder", this.newWebSocketEncoder());
        }

        channel.writeAndFlush(response).addListener(new ChannelFutureListener() {
            public void operationComplete(ChannelFuture future) throws Exception {
                if (future.isSuccess()) {
                    ChannelPipeline p = future.channel().pipeline();
                    p.remove(encoderName);
                    promise.setSuccess();
                } else {
                    promise.setFailure(future.cause());
                }

            }
        });
        return promise;
    }

握手前后的Netty Pipeline对比如下

技术图片

注:WebSocketFrameAggregator是在org.springframework.web.reactive.socket.adapter.ReactorNettyWebSocketSession#receive方法中添加的

	@Override
	public Flux<WebSocketMessage> receive() {
		return getDelegate().getInbound()
				.aggregateFrames(this.maxFramePayloadLength)
				.receiveFrames()
				.map(super::toMessage)
				.doOnNext(message -> {
					if (logger.isTraceEnabled()) {
						logger.trace(getLogPrefix() + "Received " + message);
					}
				});
	}
	
	    default WebsocketInbound aggregateFrames(int maxContentLength) {
        this.withConnection((c) -> {
            c.addHandlerLast(new WebSocketFrameAggregator(maxContentLength));
        });
        return this;
    }

7.WebsocketHandler绑定

reactor.netty.http.server.HttpServerOperations#withWebsocketSupport

FutureMono.from(ops.handshakerResult)
				                 .doOnEach(signal -> {
				                 	if(!signal.hasError() && (protocols == null || ops.selectedSubprotocol() != null)) {
					                    websocketHandler.apply(ops, ops)
					                                    .subscribe(new WebsocketSubscriber(ops, signal.getContext()));
				                    }
				                 });

websocketHandler.apply实际调用的是org.springframework.web.reactive.socket.server.upgrade.ReactorNettyRequestUpgradeStrategy#upgrade

(in, out) -> {
					ReactorNettyWebSocketSession session =
							new ReactorNettyWebSocketSession(
									in, out, handshakeInfo, bufferFactory, this.maxFramePayloadLength);
					return handler.handle(session);
				})

handler的实际类是EchoWebSocketHandler

所以最终调用了

    public Mono<Void> handle(WebSocketSession session) {
        // Use retain() for Reactor Netty
        return session.send(session.receive().doOnNext(WebSocketMessage::retain));
    }

8.websocket解码

技术图片

核心解码在WebSocket08FrameDecoder类

再来看下WebSocket协议格式:

 0                   1                   2                   3
 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-------+-+-------------+-------------------------------+
|F|R|R|R| opcode|M| Payload len |    Extended payload length    |
|I|S|S|S|  (4)  |A|     (7)     |             (16/64)           |
|N|V|V|V|       |S|             |   (if payload len==126/127)   |
| |1|2|3|       |K|             |                               |
+-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
|     Extended payload length continued, if payload len == 127  |
+ - - - - - - - - - - - - - - - +-------------------------------+
|                               |Masking-key, if MASK set to 1  |
+-------------------------------+-------------------------------+
| Masking-key (continued)       |          Payload Data         |
+-------------------------------- - - - - - - - - - - - - - - - +
:                     Payload Data continued ...                :
+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
|                     Payload Data continued ...                |
+---------------------------------------------------------------+
  • READING_FIRST 对应第一个字节的解析
  • READING_SECOND 对应第二个字节的解析
  • READING_SIZE 读取data payload长度
  • MASKING_KEY masking-key 4个字节(需要READING_SECOND 的mask位为1)
  • PAYLOAD 按长度去读payload到buffer
  • 封装成TextWebSocketFrame,添加到out数组
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {

        // Discard all data received if closing handshake was received before.
        if (receivedClosingHandshake) {
            in.skipBytes(actualReadableBytes());
            return;
        }
            switch (state) {
                case READING_FIRST:
                    if (!in.isReadable()) {
                        return;
                    }

                    framePayloadLength = 0;

                    // FIN, RSV, OPCODE
                    byte b = in.readByte();
                    frameFinalFlag = (b & 0x80) != 0;
                    frameRsv = (b & 0x70) >> 4;
                    frameOpcode = b & 0x0F;

                    if (logger.isDebugEnabled()) {
                        logger.debug("Decoding WebSocket Frame opCode={}", frameOpcode);
                    }

                    state = State.READING_SECOND;
                case READING_SECOND:
                    if (!in.isReadable()) {
                        return;
                    }
                    // MASK, PAYLOAD LEN 1
                    b = in.readByte();
                    frameMasked = (b & 0x80) != 0;
                    framePayloadLen1 = b & 0x7F;

                    if (frameRsv != 0 && !allowExtensions) {
                        protocolViolation(ctx, "RSV != 0 and no extension negotiated, RSV:" + frameRsv);
                        return;
                    }

                    if (!allowMaskMismatch && expectMaskedFrames != frameMasked) {
                        protocolViolation(ctx, "received a frame that is not masked as expected");
                        return;
                    }

                    if (frameOpcode > 7) { // control frame (have MSB in opcode set)

                        // control frames MUST NOT be fragmented
                        if (!frameFinalFlag) {
                            protocolViolation(ctx, "fragmented control frame");
                            return;
                        }

                        // control frames MUST have payload 125 octets or less
                        if (framePayloadLen1 > 125) {
                            protocolViolation(ctx, "control frame with payload length > 125 octets");
                            return;
                        }

                        // check for reserved control frame opcodes
                        if (!(frameOpcode == OPCODE_CLOSE || frameOpcode == OPCODE_PING
                                || frameOpcode == OPCODE_PONG)) {
                            protocolViolation(ctx, "control frame using reserved opcode " + frameOpcode);
                            return;
                        }

                        // close frame : if there is a body, the first two bytes of the
                        // body MUST be a 2-byte unsigned integer (in network byte
                        // order) representing a getStatus code
                        if (frameOpcode == 8 && framePayloadLen1 == 1) {
                            protocolViolation(ctx, "received close control frame with payload len 1");
                            return;
                        }
                    } else { // data frame
                        // check for reserved data frame opcodes
                        if (!(frameOpcode == OPCODE_CONT || frameOpcode == OPCODE_TEXT
                                || frameOpcode == OPCODE_BINARY)) {
                            protocolViolation(ctx, "data frame using reserved opcode " + frameOpcode);
                            return;
                        }

                        // check opcode vs message fragmentation state 1/2
                        if (fragmentedFramesCount == 0 && frameOpcode == OPCODE_CONT) {
                            protocolViolation(ctx, "received continuation data frame outside fragmented message");
                            return;
                        }

                        // check opcode vs message fragmentation state 2/2
                        if (fragmentedFramesCount != 0 && frameOpcode != OPCODE_CONT && frameOpcode != OPCODE_PING) {
                            protocolViolation(ctx,
                                    "received non-continuation data frame while inside fragmented message");
                            return;
                        }
                    }

                    state = State.READING_SIZE;
                 case READING_SIZE:

                    // Read frame payload length
                    if (framePayloadLen1 == 126) {
                        if (in.readableBytes() < 2) {
                            return;
                        }
                        framePayloadLength = in.readUnsignedShort();
                        if (framePayloadLength < 126) {
                            protocolViolation(ctx, "invalid data frame length (not using minimal length encoding)");
                            return;
                        }
                    } else if (framePayloadLen1 == 127) {
                        if (in.readableBytes() < 8) {
                            return;
                        }
                        framePayloadLength = in.readLong();
                        // TODO: check if it‘s bigger than 0x7FFFFFFFFFFFFFFF, Maybe
                        // just check if it‘s negative?

                        if (framePayloadLength < 65536) {
                            protocolViolation(ctx, "invalid data frame length (not using minimal length encoding)");
                            return;
                        }
                    } else {
                        framePayloadLength = framePayloadLen1;
                    }

                    if (framePayloadLength > maxFramePayloadLength) {
                        protocolViolation(ctx, "Max frame length of " + maxFramePayloadLength + " has been exceeded.");
                        return;
                    }

                    if (logger.isDebugEnabled()) {
                        logger.debug("Decoding WebSocket Frame length={}", framePayloadLength);
                    }

                    state = State.MASKING_KEY;
                case MASKING_KEY:
                    if (frameMasked) {
                        if (in.readableBytes() < 4) {
                            return;
                        }
                        if (maskingKey == null) {
                            maskingKey = new byte[4];
                        }
                        in.readBytes(maskingKey);
                    }
                    state = State.PAYLOAD;
                case PAYLOAD:
                    if (in.readableBytes() < framePayloadLength) {
                        return;
                    }

                    ByteBuf payloadBuffer = null;
                    try {
                        payloadBuffer = readBytes(ctx.alloc(), in, toFrameLength(framePayloadLength));

                        // Now we have all the data, the next checkpoint must be the next
                        // frame
                        state = State.READING_FIRST;

                        // Unmask data if needed
                        if (frameMasked) {
                            unmask(payloadBuffer);
                        }

                        // Processing ping/pong/close frames because they cannot be
                        // fragmented
                        if (frameOpcode == OPCODE_PING) {
                            out.add(new PingWebSocketFrame(frameFinalFlag, frameRsv, payloadBuffer));
                            payloadBuffer = null;
                            return;
                        }
                        if (frameOpcode == OPCODE_PONG) {
                            out.add(new PongWebSocketFrame(frameFinalFlag, frameRsv, payloadBuffer));
                            payloadBuffer = null;
                            return;
                        }
                        if (frameOpcode == OPCODE_CLOSE) {
                            receivedClosingHandshake = true;
                            checkCloseFrameBody(ctx, payloadBuffer);
                            out.add(new CloseWebSocketFrame(frameFinalFlag, frameRsv, payloadBuffer));
                            payloadBuffer = null;
                            return;
                        }

                        // Processing for possible fragmented messages for text and binary
                        // frames
                        if (frameFinalFlag) {
                            // Final frame of the sequence. Apparently ping frames are
                            // allowed in the middle of a fragmented message
                            if (frameOpcode != OPCODE_PING) {
                                fragmentedFramesCount = 0;
                            }
                        } else {
                            // Increment counter
                            fragmentedFramesCount++;
                        }

                        // Return the frame
                        if (frameOpcode == OPCODE_TEXT) {
                            out.add(new TextWebSocketFrame(frameFinalFlag, frameRsv, payloadBuffer));
                            payloadBuffer = null;
                            return;
                        } else if (frameOpcode == OPCODE_BINARY) {
                            out.add(new BinaryWebSocketFrame(frameFinalFlag, frameRsv, payloadBuffer));
                            payloadBuffer = null;
                            return;
                        } else if (frameOpcode == OPCODE_CONT) {
                            out.add(new ContinuationWebSocketFrame(frameFinalFlag, frameRsv,
                                    payloadBuffer));
                            payloadBuffer = null;
                            return;
                        } else {
                            throw new UnsupportedOperationException("Cannot decode web socket frame with opcode: "
                                    + frameOpcode);
                        }
                    } finally {
                        if (payloadBuffer != null) {
                            payloadBuffer.release();
                        }
                    }
                case CORRUPT:
                    if (in.isReadable()) {
                        // If we don‘t keep reading Netty will throw an exception saying
                        // we can‘t return null if no bytes read and state not changed.
                        in.readByte();
                    }
                    return;
                default:
                    throw new Error("Shouldn‘t reach here.");
            }
    }

9.数据帧处理

技术图片

接上面

  • wsdecoder处理后数据封装为TextWebSocketFrame

  • WebSocketFrameAggregator遍历TextWebSocketFrame数据,逐个发送到下一个pipeline处理

  • reactiveBridge的handler为ChannelOperationsHandler,调用ReactorNetty.CONNECTION绑定的Connection进行处理

reactor.netty.channel.ChannelOperationsHandler#channelRead

ChannelOperations<?, ?> ops = ChannelOperations.get(ctx.channel());
			if (ops != null) {
				ops.onInboundNext(ctx, msg);
			}

实际调用为FluxReceive.onInboundNext

现在分析下FluxReceive对象是如何生成处理链的

技术图片

com.example.demo.EchoWebSocketHandler#handle

@Override
public Mono<Void> handle(WebSocketSession session) {
    // Use retain() for Reactor Netty
    return session.send(session.receive().doOnNext(WebSocketMessage::retain));
}

org.springframework.web.reactive.socket.adapter.ReactorNettyWebSocketSession#receive

	public Flux<WebSocketMessage> receive() {
		return getDelegate().getInbound()
				.aggregateFrames(this.maxFramePayloadLength)
				.receiveFrames()   // 1  FluxFilter   2  FluxMap
				.map(super::toMessage) //3  FluxMap
				.doOnNext(message -> {  // 4  FluxPeek
					if (logger.isTraceEnabled()) {
						logger.trace(getLogPrefix() + "Received " + message);
					}
				});
	}

com.example.demo.EchoWebSocketHandler#handle

session.receive().doOnNext(WebSocketMessage::retain)   // doOnNext 5 FluxPeek

org.springframework.web.reactive.socket.adapter.ReactorNettyWebSocketSession#send

	public Mono<Void> send(Publisher<WebSocketMessage> messages) {
		Flux<WebSocketFrame> frames = Flux.from(messages)
				.doOnNext(message -> {  //6 FluxPeek
					if (logger.isTraceEnabled()) {  
						logger.trace(getLogPrefix() + "Sending " + message);
					}
				})
				.map(this::toFrame);  //7 FluxMap
		return getDelegate().getOutbound()
				.options(NettyPipeline.SendOptions::flushOnEach)
				.sendObject(frames)  //8 MonoSendMany
				.then();
	}

reactor.netty.http.server.HttpServerOperations#withWebsocketSupport

websocketHandler.apply(ops, ops).subscribe(new WebsocketSubscriber(ops, signal.getContext())   //9 WebsocketSubscriber

WebFlux之Websocket

标签:return   inf   load data   require   remote   bin   sas   pool   har   

原文地址:https://www.cnblogs.com/huiyao/p/14454510.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!