Skip to content

Commit

Permalink
remove 0MQ and replace with netty
Browse files Browse the repository at this point in the history
  • Loading branch information
ptgoetz committed Dec 10, 2013
1 parent 1bcc169 commit b63ed13
Show file tree
Hide file tree
Showing 23 changed files with 36 additions and 382 deletions.
1 change: 0 additions & 1 deletion MODULES
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
storm-console-logging
storm-core
storm-netty

31 changes: 0 additions & 31 deletions bin/install_zmq.sh

This file was deleted.

2 changes: 1 addition & 1 deletion conf/defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ storm.zookeeper.retry.intervalceiling.millis: 30000
storm.cluster.mode: "distributed" # can be distributed or local
storm.local.mode.zmq: false
storm.thrift.transport: "backtype.storm.security.auth.SimpleTransportPlugin"
storm.messaging.transport: "backtype.storm.messaging.zmq"
storm.messaging.transport: "backtype.storm.messaging.netty.Context"

### nimbus.* configs are for the master
nimbus.host: "localhost"
Expand Down
2 changes: 1 addition & 1 deletion storm-core/project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
[clj-time "0.4.1"]
[com.netflix.curator/curator-framework "1.0.1"
:exclusions [log4j/log4j]]
[backtype/jzmq "2.1.0"]
[com.googlecode.json-simple/json-simple "1.1"]
[compojure "1.1.3"]
[hiccup "0.3.6"]
Expand All @@ -27,6 +26,7 @@
[com.google.guava/guava "13.0"]
[ch.qos.logback/logback-classic "1.0.6"]
[org.slf4j/log4j-over-slf4j "1.6.6"]
[io.netty/netty "3.6.3.Final"]
]

:source-paths ["src/clj"]
Expand Down
93 changes: 0 additions & 93 deletions storm-core/src/clj/backtype/storm/messaging/zmq.clj

This file was deleted.

2 changes: 1 addition & 1 deletion storm-core/src/clj/backtype/storm/testing.clj
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@
;; local dir is always overridden in maps
;; can customize the supervisors (except for ports) by passing in map for :supervisors parameter
;; if need to customize amt of ports more, can use add-supervisor calls afterwards
(defnk mk-local-storm-cluster [:supervisors 2 :ports-per-supervisor 3 :daemon-conf {} :inimbus nil :supervisor-slot-port-min 1]
(defnk mk-local-storm-cluster [:supervisors 2 :ports-per-supervisor 3 :daemon-conf {} :inimbus nil :supervisor-slot-port-min 1024]
(let [zk-tmp (local-temp-path)
[zk-port zk-handle] (zk/mk-inprocess-zookeeper zk-tmp)
daemon-conf (merge (read-storm-config)
Expand Down
104 changes: 0 additions & 104 deletions storm-core/src/clj/zilch/mq.clj

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
package backtype.storm.messaging.netty;

import backtype.storm.Config;
import backtype.storm.messaging.IConnection;
import backtype.storm.messaging.TaskMessage;
import backtype.storm.utils.Utils;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;
import java.util.Map;
import java.util.Random;
Expand All @@ -9,18 +20,6 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import backtype.storm.Config;
import backtype.storm.messaging.IConnection;
import backtype.storm.messaging.TaskMessage;
import backtype.storm.utils.Utils;

class Client implements IConnection {
private static final Logger LOG = LoggerFactory.getLogger(Client.class);
private final int max_retries;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package backtype.storm.messaging.netty;

import java.util.Map;
import java.util.Vector;

import backtype.storm.messaging.IConnection;
import backtype.storm.messaging.IContext;

import java.util.Map;
import java.util.Vector;

public class Context implements IContext {
@SuppressWarnings("rawtypes")
private Map storm_conf;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBufferOutputStream;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;

enum ControlMessage {
CLOSE_MESSAGE((short)-100),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
package backtype.storm.messaging.netty;

import java.util.ArrayList;

import backtype.storm.messaging.TaskMessage;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBufferOutputStream;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;

import backtype.storm.messaging.TaskMessage;
import java.util.ArrayList;

class MessageBatch {
private int buffer_size;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package backtype.storm.messaging.netty;

import backtype.storm.messaging.TaskMessage;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.frame.FrameDecoder;
import backtype.storm.messaging.TaskMessage;

public class MessageDecoder extends FrameDecoder {
/*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package backtype.storm.messaging.netty;

import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Executors;

import backtype.storm.Config;
import backtype.storm.messaging.IConnection;
import backtype.storm.messaging.TaskMessage;
import backtype.storm.utils.Utils;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
Expand All @@ -14,10 +13,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import backtype.storm.Config;
import backtype.storm.messaging.IConnection;
import backtype.storm.messaging.TaskMessage;
import backtype.storm.utils.Utils;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

class Server implements IConnection {
private static final Logger LOG = LoggerFactory.getLogger(Server.class);
Expand Down
Loading

0 comments on commit b63ed13

Please sign in to comment.