码迷,mamicode.com
首页 > 编程语言 > 详细

jstorm在使用kafka作为spout的时候多线程问题

时间:2017-08-06 12:46:32      阅读:177      评论:0      收藏:0      [点我收藏+]

标签:font   space   back   报错   block   fms   turn   sort   strong   

  jstorm在使用kafka作为spout的时候,高并发情况下会出现多线程报错问题
需要对这两个类进行适当的修改来避免上述问题:
  storm.kafka.PartitionManager
  storm.kafka.ExponentialBackoffMsgRetryManager
1.storm.kafka.PartitionManager的修改
//将变量
private SortedMap<Long, Long> _pending = new TreeMap();
//改为:
private SortedMap<Long, Long> _pending = Collections.synchronizedSortedMap(new TreeMap<Long, Long>());

/**----------------------------------------------------------------------------------------------------**/

//将方法
public long lastCompletedOffset() {
    return this._pending.isEmpty()?this._emittedToOffset.longValue():((Long)this._pending.firstKey()).longValue();
}
//改为:
public long lastCompletedOffset() {
    synchronized (_pending) {
        if (_pending.isEmpty()) {
            return _emittedToOffset;
        } else {
            return _pending.firstKey();
        }
    }
}

2.storm.kafka.ExponentialBackoffMsgRetryManager的修改
//
private Queue<ExponentialBackoffMsgRetryManager.MessageRetryRecord> waiting = new PriorityQueue(11, new ExponentialBackoffMsgRetryManager.RetryTimeComparator()); 
private Map<Long, ExponentialBackoffMsgRetryManager.MessageRetryRecord> records = new ConcurrentHashMap();
//改为:
private Queue<MessageRetryRecord> waiting = new PriorityBlockingQueue<MessageRetryRecord>(11, new RetryTimeComparator()); 
private Map<Long,MessageRetryRecord> records = new ConcurrentHashMap<Long,MessageRetryRecord>();

 

jstorm在使用kafka作为spout的时候多线程问题

标签:font   space   back   报错   block   fms   turn   sort   strong   

原文地址:http://www.cnblogs.com/Stubborn-Ant/p/7293987.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!