Skip to content

Commit

Permalink
Merge tag 'v0.9.2-incubating' into security
Browse files Browse the repository at this point in the history
[maven-release-plugin]  copy for tag v0.9.2-incubating

Conflicts:
	storm-core/pom.xml
  • Loading branch information
revans2 committed Jun 19, 2014
2 parents 87cdbf5 + 24d4a14 commit 79089ad
Show file tree
Hide file tree
Showing 12 changed files with 237 additions and 33 deletions.
6 changes: 3 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
## 0.9.3-incubating (unreleased)
* STORM-338: Move towards idiomatic Clojure style

## 0.9.2-incubating
* STORM-66: send taskid on initial handshake
* STORM-342: Contention in Disruptor Queue which may cause out of order or lost messages
* STORM-338: Move towards idiomatic Clojure style
* STORM-335: add drpc test for removing timed out requests from queue
* STORM-69: Storm UI Visualizations for Topologies
* STORM-297: Performance scaling with CPU
Expand Down
2 changes: 1 addition & 1 deletion examples/storm-starter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<artifactId>storm</artifactId>
<groupId>org.apache.storm</groupId>
<version>0.9.3-incubating-SNAPSHOT</version>
<version>0.9.2-incubating</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion external/storm-kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<artifactId>storm</artifactId>
<groupId>org.apache.storm</groupId>
<version>0.9.3-incubating-SNAPSHOT</version>
<version>0.9.2-incubating</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ public void fail(Long offset) {

public void commit() {
long lastCompletedOffset = lastCompletedOffset();
if (lastCompletedOffset != lastCompletedOffset) {
if (_committedTo != lastCompletedOffset) {
LOG.debug("Writing last completed offset (" + lastCompletedOffset + ") to ZK for " + _partition + " for topology: " + _topologyInstanceId);
Map<Object, Object> data = (Map<Object, Object>) ImmutableMap.builder()
.put("topology", ImmutableMap.of("id", _topologyInstanceId,
Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

<groupId>org.apache.storm</groupId>
<artifactId>storm</artifactId>
<version>0.9.3-incubating-SNAPSHOT</version>
<version>0.9.2-incubating</version>
<packaging>pom</packaging>
<name>Storm</name>
<description>Distributed and fault-tolerant realtime computation</description>
Expand Down Expand Up @@ -164,7 +164,7 @@
<scm>
<connection>scm:git:https://git-wip-us.apache.org/repos/asf/incubator-storm.git</connection>
<developerConnection>scm:git:https://git-wip-us.apache.org/repos/asf/incubator-storm.git</developerConnection>
<tag>HEAD</tag>
<tag>v0.9.2-incubating</tag>
<url>https://git-wip-us.apache.org/repos/asf/incubator-storm</url>
</scm>

Expand Down
2 changes: 1 addition & 1 deletion storm-buildtools/maven-shade-clojure-transformer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<artifactId>storm</artifactId>
<groupId>org.apache.storm</groupId>
<version>0.9.3-incubating-SNAPSHOT</version>
<version>0.9.2-incubating</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
14 changes: 13 additions & 1 deletion storm-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<artifactId>storm</artifactId>
<groupId>org.apache.storm</groupId>
<version>0.9.3-incubating-SNAPSHOT</version>
<version>0.9.2-incubating</version>
</parent>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
Expand Down Expand Up @@ -191,9 +191,21 @@
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.clojars.runa</groupId>
<artifactId>conjure</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.1</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/jvm</sourceDirectory>
<testSourceDirectory>test/jvm</testSourceDirectory>
<resources>
<resource>
<directory>../conf</directory>
Expand Down
11 changes: 11 additions & 0 deletions storm-core/src/jvm/backtype/storm/task/TopologyContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang.NotImplementedException;
import org.json.simple.JSONValue;

/**
* A TopologyContext is given to bolts and spouts in their "prepare" and "open"
Expand Down Expand Up @@ -217,6 +218,16 @@ public void addTaskHook(ITaskHook hook) {
public Collection<ITaskHook> getHooks() {
return _hooks;
}

@Override
public String toJSONString() {
Map obj = new HashMap();
obj.put("task->component", this.getTaskToComponent());
obj.put("taskid", this.getThisTaskId());
// TODO: jsonify StormTopology
// at the minimum should send source info
return JSONValue.toJSONString(obj);
}

/*
* Register a IMetric instance.
Expand Down
70 changes: 49 additions & 21 deletions storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@
import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.SingleThreadedClaimStrategy;
import com.lmax.disruptor.WaitStrategy;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.HashMap;
import java.util.Map;
import backtype.storm.metric.api.IStatefulObject;
import java.util.logging.Level;
import java.util.logging.Logger;


/**
*
Expand All @@ -51,6 +53,11 @@ public class DisruptorQueue implements IStatefulObject {
// TODO: consider having a threadlocal cache of this variable to speed up reads?
volatile boolean consumerStartedFlag = false;
ConcurrentLinkedQueue<Object> _cache = new ConcurrentLinkedQueue();

private final ReentrantReadWriteLock cacheLock = new ReentrantReadWriteLock();
private final Lock readLock = cacheLock.readLock();
private final Lock writeLock = cacheLock.writeLock();

private static String PREFIX = "disruptor-";
private String _queueName = "";

Expand All @@ -62,6 +69,13 @@ public DisruptorQueue(String queueName, ClaimStrategy claim, WaitStrategy wait)
_buffer.setGatingSequences(_consumer);
if(claim instanceof SingleThreadedClaimStrategy) {
consumerStartedFlag = true;
} else {
// make sure we flush the pending messages in cache first
try {
publishDirect(FLUSH_CACHE, true);
} catch (InsufficientCapacityException e) {
throw new RuntimeException("This code should be unreachable!", e);
}
}
}

Expand Down Expand Up @@ -134,33 +148,47 @@ public void tryPublish(Object obj) throws InsufficientCapacityException {
}

public void publish(Object obj, boolean block) throws InsufficientCapacityException {
if(consumerStartedFlag) {
final long id;
if(block) {
id = _buffer.next();
} else {
id = _buffer.tryNext(1);

boolean publishNow = consumerStartedFlag;

if (!publishNow) {
readLock.lock();
try {
publishNow = consumerStartedFlag;
if (!publishNow) {
_cache.add(obj);
}
} finally {
readLock.unlock();
}
final MutableObject m = _buffer.get(id);
m.setObject(obj);
_buffer.publish(id);
} else {
_cache.add(obj);
if(consumerStartedFlag) flushCache();
}

if (publishNow) {
publishDirect(obj, block);
}
}

public void consumerStarted() {
if(!consumerStartedFlag) {
consumerStartedFlag = true;
flushCache();
private void publishDirect(Object obj, boolean block) throws InsufficientCapacityException {
final long id;
if(block) {
id = _buffer.next();
} else {
id = _buffer.tryNext(1);
}
final MutableObject m = _buffer.get(id);
m.setObject(obj);
_buffer.publish(id);
}

private void flushCache() {
publish(FLUSH_CACHE);
}
public void consumerStarted() {

consumerStartedFlag = true;

// Use writeLock to make sure all pending cache add opearation completed
writeLock.lock();
writeLock.unlock();
}

public long population() { return (writePos() - readPos()); }
public long capacity() { return _buffer.getBufferSize(); }
public long writePos() { return _buffer.getCursor(); }
Expand Down
153 changes: 153 additions & 0 deletions storm-core/test/jvm/backtype/storm/utils/DisruptorQueueTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package backtype.storm.utils;

import java.util.concurrent.atomic.AtomicBoolean;

import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.InsufficientCapacityException;
import com.lmax.disruptor.MultiThreadedClaimStrategy;
import org.junit.Assert;
import org.junit.Test;
import junit.framework.TestCase;

public class DisruptorQueueTest extends TestCase {

private final static int TIMEOUT = 5; // MS
private final static int PRODUCER_NUM = 4;

@Test
public void testMessageDisorder() throws InterruptedException {

// Set queue length to bigger enough
DisruptorQueue queue = createQueue("messageOrder", 16);

queue.publish("1");

Runnable producer = new Producer(queue, "2");

final Object [] result = new Object[1];
Runnable consumer = new Consumer(queue, new EventHandler<Object>() {
private boolean head = true;

@Override
public void onEvent(Object obj, long sequence, boolean endOfBatch)
throws Exception {
if (head) {
head = false;
result[0] = obj;
}
}
});

run(producer, consumer);
Assert.assertEquals("We expect to receive first published message first, but received " + result[0],
"1", result[0]);
}

@Test
public void testConsumerHang() throws InterruptedException {
final AtomicBoolean messageConsumed = new AtomicBoolean(false);

// Set queue length to 1, so that the RingBuffer can be easily full
// to trigger consumer blocking
DisruptorQueue queue = createQueue("consumerHang", 1);
Runnable producer = new Producer(queue, "msg");
Runnable consumer = new Consumer(queue, new EventHandler<Object>() {
@Override
public void onEvent(Object obj, long sequence, boolean endOfBatch)
throws Exception {
messageConsumed.set(true);
}
});

run(producer, consumer);
Assert.assertTrue("disruptor message is never consumed due to consumer thread hangs",
messageConsumed.get());
}


private void run(Runnable producer, Runnable consumer)
throws InterruptedException {

Thread[] producerThreads = new Thread[PRODUCER_NUM];
for (int i = 0; i < PRODUCER_NUM; i++) {
producerThreads[i] = new Thread(producer);
producerThreads[i].start();
}

Thread consumerThread = new Thread(consumer);
consumerThread.start();

for (int i = 0; i < PRODUCER_NUM; i++) {
producerThreads[i].interrupt();
producerThreads[i].join(TIMEOUT);
}
consumerThread.interrupt();
consumerThread.join(TIMEOUT);
}

private class Producer implements Runnable {
private String msg;
private DisruptorQueue queue;

Producer(DisruptorQueue queue, String msg) {
this.msg = msg;
this.queue = queue;
}

@Override
public void run() {
try {
while (true) {
queue.publish(msg, false);
}
} catch (InsufficientCapacityException e) {
return;
}
}
};

private class Consumer implements Runnable {
private EventHandler handler;
private DisruptorQueue queue;

Consumer(DisruptorQueue queue, EventHandler handler) {
this.handler = handler;
this.queue = queue;
}

@Override
public void run() {
queue.consumerStarted();
try {
while(true) {
queue.consumeBatchWhenAvailable(handler);
}
}catch(RuntimeException e) {
//break
}
}
};

private static DisruptorQueue createQueue(String name, int queueSize) {
return new DisruptorQueue(name, new MultiThreadedClaimStrategy(
queueSize), new BlockingWaitStrategy());
}
}
2 changes: 1 addition & 1 deletion storm-dist/binary/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<artifactId>storm</artifactId>
<groupId>org.apache.storm</groupId>
<version>0.9.3-incubating-SNAPSHOT</version>
<version>0.9.2-incubating</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<groupId>org.apache.storm</groupId>
Expand Down
Loading

0 comments on commit 79089ad

Please sign in to comment.