Skip to content

Commit f2ede58

Browse files
committed
toggle for NonBlockingHash{Map,Set}
1 parent 15c8e17 commit f2ede58

File tree

3 files changed

+81
-14
lines changed

3 files changed

+81
-14
lines changed

src/main/scala/scalang/Node.scala

Lines changed: 43 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@ import java.util.concurrent.atomic._
1919
import org.jboss.{netty => netty}
2020
import netty.channel._
2121
import java.util.concurrent._
22+
import java.util.{Set => JSet}
2223
import scalang.node._
24+
import scalang.util.concurrent.ConcurrentHashSet
2325
import org.jctools.maps._
2426
import overlock.atomicmap._
2527
import org.jetlang._
@@ -184,6 +186,34 @@ trait Node extends ClusterListener with ClusterPublisher {
184186
def timer : HashedWheelTimer
185187
}
186188

189+
object ErlangNode {
190+
191+
def newConcurrentMap[K,V](config : NodeConfig): ConcurrentMap[K,V] = {
192+
if (config.useNBHM) {
193+
new NonBlockingHashMap[K,V]
194+
} else {
195+
new ConcurrentHashMap[K,V]
196+
}
197+
}
198+
199+
def newAtomicMap[K,V](config : NodeConfig): AtomicMap[K,V] = {
200+
if (config.useNBHM) {
201+
AtomicMap.atomicNBHM[K,V]
202+
} else {
203+
AtomicMap.atomicCHM[K,V]
204+
}
205+
}
206+
207+
def newConcurrentSet[E](config : NodeConfig): JSet[E] = {
208+
if (config.useNBHM) {
209+
new NonBlockingHashSet[E]
210+
} else {
211+
new ConcurrentHashSet[E]
212+
}
213+
}
214+
215+
}
216+
187217
class ErlangNode(val name : Symbol, val cookie : String, config : NodeConfig) extends Node
188218
with ExitListener
189219
with SendListener
@@ -198,11 +228,11 @@ class ErlangNode(val name : Symbol, val cookie : String, config : NodeConfig) ex
198228
val tickTime = config.tickTime
199229
val poolFactory = config.poolFactory
200230
var creation : Int = 0
201-
val processes = new NonBlockingHashMap[Pid,ProcessAdapter]
202-
val registeredNames = new NonBlockingHashMap[Symbol,Pid]
203-
val channels = AtomicMap.atomicNBHM[Symbol,Channel]
204-
val links = AtomicMap.atomicNBHM[Channel,NonBlockingHashSet[Link]]
205-
val monitors = AtomicMap.atomicNBHM[Channel,NonBlockingHashSet[Monitor]]
231+
val processes = ErlangNode.newConcurrentMap[Pid,ProcessAdapter](config)
232+
val registeredNames = ErlangNode.newConcurrentMap[Symbol,Pid](config)
233+
val channels = ErlangNode.newAtomicMap[Symbol,Channel](config)
234+
val links = ErlangNode.newAtomicMap[Channel,JSet[Link]](config)
235+
val monitors = ErlangNode.newAtomicMap[Channel,JSet[Monitor]](config)
206236
val pidCount = new AtomicInteger(0)
207237
val pidSerial = new AtomicInteger(0)
208238
val executor = poolFactory.createActorPool
@@ -493,7 +523,7 @@ class ErlangNode(val name : Symbol, val cookie : String, config : NodeConfig) ex
493523
}
494524
} else {
495525
getOrConnectAndSend(to.node, LinkMessage(from, to), { channel =>
496-
val set = links.getOrElseUpdate(channel, new NonBlockingHashSet[Link])
526+
val set = links.getOrElseUpdate(channel, ErlangNode.newConcurrentSet[Link](config))
497527
set.add(link)
498528
})
499529
}
@@ -537,30 +567,30 @@ class ErlangNode(val name : Symbol, val cookie : String, config : NodeConfig) ex
537567
case Some(p : ProcessAdapter) =>
538568
val link = p.registerLink(to)
539569
if (!isLocal(from))
540-
links.getOrElseUpdate(channel, new NonBlockingHashSet[Link]).add(link)
570+
links.getOrElseUpdate(channel, ErlangNode.newConcurrentSet[Link](config)).add(link)
541571
case None =>
542572
if (isLocal(from)) {
543573
log.warn("Try to link non-live process %s to %s", from, to)
544574
val link = Link(from, to)
545575
break(link, 'noproc)
546576
} else {
547-
links.getOrElseUpdate(channel, new NonBlockingHashSet[Link]).add(Link(from, to))
577+
links.getOrElseUpdate(channel, ErlangNode.newConcurrentSet[Link](config)).add(Link(from, to))
548578
}
549579
}
550580

551581
process(to) match {
552582
case Some(p : ProcessAdapter) =>
553583
val link = p.registerLink(from)
554584
if (!isLocal(to))
555-
links.getOrElseUpdate(channel, new NonBlockingHashSet[Link]).add(link)
585+
links.getOrElseUpdate(channel, ErlangNode.newConcurrentSet[Link](config)).add(link)
556586

557587
case None =>
558588
if (isLocal(to)) {
559589
log.warn("Try to link non-live process %s to %s", to, from)
560590
val link = Link(from, to)
561591
break(link, 'noproc)
562592
} else {
563-
links.getOrElseUpdate(channel, new NonBlockingHashSet[Link]).add(Link(from, to))
593+
links.getOrElseUpdate(channel, ErlangNode.newConcurrentSet[Link](config)).add(Link(from, to))
564594
}
565595
}
566596
}
@@ -584,7 +614,7 @@ class ErlangNode(val name : Symbol, val cookie : String, config : NodeConfig) ex
584614
}
585615
} else {
586616
getOrConnectAndSend(nodeOf(monitored), MonitorMessage(monitoring, monitored, ref), { channel =>
587-
val set = monitors.getOrElseUpdate(channel, new NonBlockingHashSet[Monitor])
617+
val set = monitors.getOrElseUpdate(channel, ErlangNode.newConcurrentSet[Monitor](config))
588618
set.add(monitor)
589619
})
590620
}
@@ -611,14 +641,14 @@ class ErlangNode(val name : Symbol, val cookie : String, config : NodeConfig) ex
611641
log.debug("adding monitor for %s", p)
612642
val monitor = p.registerMonitor(monitoring, ref)
613643
if (!isLocal(monitored))
614-
monitors.getOrElseUpdate(channel, new NonBlockingHashSet[Monitor]).add(monitor)
644+
monitors.getOrElseUpdate(channel, ErlangNode.newConcurrentSet[Monitor](config))
615645
case None =>
616646
if (isLocal(monitored)) {
617647
log.warn("Try to monitor non-live process: %s -> %s (%s)", monitoring, monitored, ref)
618648
val monitor = Monitor(monitoring, monitored, ref)
619649
monitorExit(monitor, 'noproc)
620650
} else {
621-
monitors.getOrElseUpdate(channel, new NonBlockingHashSet[Monitor]).add(Monitor(monitoring, monitored, ref))
651+
monitors.getOrElseUpdate(channel, ErlangNode.newConcurrentSet[Monitor](config)).add(Monitor(monitoring, monitored, ref))
622652
}
623653
}
624654
}

src/main/scala/scalang/NodeConfig.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@ case class NodeConfig(
2424
typeFactory : TypeFactory = NoneTypeFactory,
2525
typeEncoder: TypeEncoder = NoneTypeEncoder,
2626
typeDecoder : TypeDecoder = NoneTypeDecoder,
27-
tickTime : Int = 60)
27+
tickTime : Int = 60,
28+
useNBHM: Boolean = true)
2829

2930
object NoneTypeFactory extends TypeFactory {
3031
def createType(name : Symbol, arity : Int, reader : TermReader) = None
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package scalang.util.concurrent;
2+
3+
import java.util.AbstractSet;
4+
import java.util.concurrent.ConcurrentHashMap;
5+
6+
class ConcurrentHashSet[E] extends AbstractSet[E] {
7+
8+
val V = "";
9+
10+
val map = new ConcurrentHashMap[E,Object]
11+
12+
override def add(obj: E) = {
13+
map.putIfAbsent(obj, V) == null;
14+
}
15+
16+
override def contains(obj: Object) = {
17+
map.containsKey(obj);
18+
}
19+
20+
override def remove(obj: Object) = {
21+
map.remove(obj) == V;
22+
}
23+
24+
override def size() = {
25+
map.size();
26+
}
27+
28+
override def clear() {
29+
map.clear();
30+
}
31+
32+
override def iterator() = {
33+
map.keySet().iterator();
34+
}
35+
36+
}

0 commit comments

Comments
 (0)