码迷,mamicode.com
首页 > 其他好文 > 详细

Flink 消费RabbitMQ 和 Kafka

时间:2021-06-11 19:15:45      阅读:0      评论:0      收藏:0      [点我收藏+]

标签:could not   定时   select   offset   mqc   apach   live   read   deb   

在消息RabbitMQ时,我们关心的一个问题是手动ack还是自动ack,如果是自动ack就怕出现丢消息的情况
Flink以RabbitMQ作为Source,是怎么保证消息唯一性的呢,是怎么保证ack的.

 

首先引入依赖包
<dependency>
     <groupId>org.apache.flink</groupId>
     <artifactId>flink-connector-kafka-0.10_${scala.version}</artifactId>
     <version>${flink.version}</version>
</dependency>               

 

RMQSource类,可以看到如果设置了checkpointing,则默认autoAck是false,是手动控制提交的
那什么时候提交呢,flink checkpointing有个时间间隔,每次checkpointing触发时,才能ack,也就是说,不是一条消息ack一下,而是定时ack
这个跟kafka,update offset一样,都是在checkpoint的时候处理 @Override public void open(Configuration config) throws Exception { super.open(config); ConnectionFactory factory = setupConnectionFactory(); try { connection = factory.newConnection(); channel = connection.createChannel(); if (channel == null) { throw new RuntimeException("None of RabbitMQ channels are available"); } setupQueue(); consumer = new QueueingConsumer(channel); RuntimeContext runtimeContext = getRuntimeContext(); if (runtimeContext instanceof StreamingRuntimeContext && ((StreamingRuntimeContext) runtimeContext).isCheckpointingEnabled()) { autoAck = false; // enables transaction mode channel.txSelect(); } else { autoAck = true; } LOG.debug("Starting RabbitMQ source with autoAck status: " + autoAck); channel.basicConsume(queueName, autoAck, consumer); } catch (IOException e) { throw new RuntimeException("Cannot create RMQ connection with " + queueName + " at " + rmqConnectionConfig.getHost(), e); } running = true; }

 

RMQSource

@Override
	public void run(SourceContext<OUT> ctx) throws Exception {
		while (running) {
			QueueingConsumer.Delivery delivery = consumer.nextDelivery();

			synchronized (ctx.getCheckpointLock()) {

				OUT result = schema.deserialize(delivery.getBody());

				if (schema.isEndOfStream(result)) {
					break;
				}

				if (!autoAck) {
					final long deliveryTag = delivery.getEnvelope().getDeliveryTag();
					if (usesCorrelationId) {
						final String correlationId = delivery.getProperties().getCorrelationId();
						Preconditions.checkNotNull(correlationId, "RabbitMQ source was instantiated " +
							"with usesCorrelationId set to true but a message was received with " +
							"correlation id set to null!");
						if (!addId(correlationId)) {
							// we have already processed this message
							continue;
						}
					}
					sessionIds.add(deliveryTag);
				}

				ctx.collect(result);
			}
		}
	}

 

@Override
	protected void acknowledgeSessionIDs(List<Long> sessionIds) {
		try {
			for (long id : sessionIds) {
				channel.basicAck(id, false);
			}
			channel.txCommit();
		} catch (IOException e) {
			throw new RuntimeException("Messages could not be acknowledged during checkpoint creation.", e);
		}
	}

 

Flink 消费RabbitMQ 和 Kafka

标签:could not   定时   select   offset   mqc   apach   live   read   deb   

原文地址:https://www.cnblogs.com/long-yuan/p/14875470.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!