Skip to content

Commit

Permalink
fix #110
Browse files Browse the repository at this point in the history
  • Loading branch information
pron committed Aug 26, 2015
1 parent 41c03a7 commit 092ae88
Show file tree
Hide file tree
Showing 18 changed files with 245 additions and 197 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,7 @@ public static <T extends Actor<Message, V>, Message, V> T newActor(ActorSpec<T,
private volatile V result;
private volatile Throwable exception;
private volatile Throwable deathCause;
private Object globalId;
private volatile Object registrationId;
private volatile Object globalId;
private transient volatile ActorMonitor monitor;
private volatile boolean registered;
private boolean hasMonitor;
Expand Down Expand Up @@ -830,7 +829,11 @@ public final boolean isRegistered() {
}

Object getGlobalId() {
return globalId != null ? globalId : registrationId;
return globalId;
}

void setGlobalId(Object globalId) {
this.globalId = globalId;
}

@Override
Expand Down Expand Up @@ -977,7 +980,7 @@ public final Actor register() throws SuspendExecution {
if (registered)
return this;
record(1, "Actor", "register", "Registering actor %s as %s", this, getName());
this.registrationId = ActorRegistry.register(this, null);
ActorRegistry.register(this);
return this;
}

Expand All @@ -992,7 +995,7 @@ public final Actor unregister() {
record(1, "Actor", "unregister", "Unregistering actor %s (name: %s)", this, getName());
if (getName() == null)
throw new IllegalArgumentException("name is null");
ActorRegistry.unregister(ref());
ActorRegistry.unregister(this);
if (monitor != null)
this.monitor.setActor(null);
this.registered = false;
Expand Down Expand Up @@ -1180,7 +1183,6 @@ public final Actor<Message, V> build() throws SuspendExecution {
protected final Object writeReplace() throws java.io.ObjectStreamException {
if (migrating)
return this;

final RemoteActor<Message> remote = RemoteActorProxyFactoryService.create(ref(), getGlobalId());
// remote.startReceiver();
return remote;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public void send(Message message) throws SuspendExecution {
x.throwIn(e);
}
} catch (RuntimeException e) {
e.printStackTrace();
LostActor.instance.ref().send(message);
LostActor.instance.throwIn(e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
* Quasar: lightweight threads and actors for the JVM.
* Copyright (c) 2013-2014, Parallel Universe Software Co. All rights reserved.
* Copyright (c) 2013-2015, Parallel Universe Software Co. All rights reserved.
*
* This program and the accompanying materials are dual-licensed under
* either the terms of the Eclipse Public License v1.0 as published by
Expand Down Expand Up @@ -42,24 +42,23 @@ public class ActorRegistry {

}

static Object register(Actor<?, ?> actor, Object globalId) throws SuspendExecution {
static <Message> void register(Actor<Message, ?> actor) throws SuspendExecution {
final String name = actor.getName();
if (name == null)
throw new IllegalArgumentException("name is null");
LOG.info("Registering {}: {}", name, actor);

actor.preRegister(name);
final Object res = registry.register(actor.ref0(), globalId);
registry.register(actor, actor.ref0());
actor.postRegister();

actor.monitor();
return res;
}

static void unregister(final ActorRef<?> actor) {
static <Message> void unregister(Actor<Message, ?> actor) {
LOG.info("Unregistering actor: {}", actor.getName());

registry.unregister(actor);
registry.unregister(actor, actor.ref());
}

/**
Expand All @@ -68,8 +67,8 @@ static void unregister(final ActorRef<?> actor) {
* @param name the actor's name.
* @return the actor, or {@code null} if no actor by that name is currently registered.
*/
public static <Message> ActorRef<Message> tryGetActor(String name) throws SuspendExecution {
return registry.tryGetActor(name);
public static <T extends ActorRef<?>> T tryGetActor(String name) throws SuspendExecution {
return (T) registry.tryGetActor(name);
}

/**
Expand All @@ -80,8 +79,8 @@ public static <Message> ActorRef<Message> tryGetActor(String name) throws Suspen
* @param unit the timeout's unit
* @return the actor, or {@code null} if the timeout expires before one is registered.
*/
public static <Message> ActorRef<Message> getActor(String name, long timeout, TimeUnit unit) throws InterruptedException, SuspendExecution {
return registry.getActor(name, timeout, unit);
public static <T extends ActorRef<?>> T getActor(String name, long timeout, TimeUnit unit) throws InterruptedException, SuspendExecution {
return (T) registry.getActor(name, timeout, unit);
}

/**
Expand All @@ -90,7 +89,7 @@ public static <Message> ActorRef<Message> getActor(String name, long timeout, Ti
* @param name the actor's name.
* @return the actor.
*/
public static <Message> ActorRef<Message> getActor(String name) throws InterruptedException, SuspendExecution {
public static <T extends ActorRef<?>> T getActor(String name) throws InterruptedException, SuspendExecution {
return getActor(name, 0, null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,24 +30,24 @@
*
* @author pron
*/
class LocalActorRegistry implements co.paralleluniverse.actors.spi.ActorRegistry {
class LocalActorRegistry extends co.paralleluniverse.actors.spi.ActorRegistry {
private static final Logger LOG = LoggerFactory.getLogger(LocalActorRegistry.class);
private final ConcurrentMap<String, ActorRef<?>> registeredActors = MapUtil.newConcurrentHashMap();
private final ReentrantLock lock = new ReentrantLock();
private final Condition cond = lock.newCondition();

@Override
@Suspendable
public Object register(ActorRef<?> actor, Object globalId) {
final String name = actor.getName();
public <Message> void register(Actor<Message, ?> actor, ActorRef<Message> actorRef) {
final String name = actorRef.getName();
if (name == null)
throw new IllegalArgumentException("name is null");

lock.lock();
try {
final ActorRef<?> old = registeredActors.get(name);
if (old != null && Objects.equal(old, actor))
return globalId;
if (old != null && Objects.equal(old, actorRef))
return;

if (old != null && LocalActor.isLocal(old) && !LocalActor.isDone(old))
throw new RegistrationException("Actor " + old + " is not dead and is already registered under " + name);
Expand All @@ -58,25 +58,23 @@ public Object register(ActorRef<?> actor, Object globalId) {
if (old != null && !registeredActors.remove(name, old))
throw new RegistrationException("Concurrent registration under the name " + name);

if (registeredActors.putIfAbsent(name, actor) != null)
if (registeredActors.putIfAbsent(name, actorRef) != null)
throw new RegistrationException("Concurrent registration under the name " + name);

cond.signalAll();
} finally {
lock.unlock();
}
LOG.info("Registering {}: {}", name, actor);

return globalId;
LOG.info("Registering {}: {}", name, actorRef);
}

@Override
public void unregister(ActorRef<?> actor) {
registeredActors.remove(actor.getName());
public <Message> void unregister(Actor<Message, ?> actor, ActorRef<Message> actorRef) {
registeredActors.remove(actorRef.getName());
}

@Override
public <Message> ActorRef<Message> tryGetActor(final String name) throws SuspendExecution {
public ActorRef<?> tryGetActor(final String name) throws SuspendExecution {
ActorRef<?> actor = registeredActors.get(name);
if (actor == null) {
lock.lock();
Expand All @@ -86,16 +84,16 @@ public <Message> ActorRef<Message> tryGetActor(final String name) throws Suspend
lock.unlock();
}
}
return (ActorRef<Message>) actor;
return actor;
}

@Override
public <Message> ActorRef<Message> getActor(final String name) throws InterruptedException, SuspendExecution {
public ActorRef<?> getActor(final String name) throws InterruptedException, SuspendExecution {
return getActor(name, 0, null);
}

@Override
public <Message> ActorRef<Message> getActor(final String name, long timeout, TimeUnit unit) throws InterruptedException, SuspendExecution {
public ActorRef<?> getActor(final String name, long timeout, TimeUnit unit) throws InterruptedException, SuspendExecution {
ActorRef<?> actor = registeredActors.get(name);
if (actor == null) {
final long deadline = unit != null ? System.nanoTime() + unit.toNanos(timeout) : 0;
Expand All @@ -119,16 +117,16 @@ public <Message> ActorRef<Message> getActor(final String name, long timeout, Tim
lock.unlock();
}
}
return (ActorRef<Message>) actor;
return actor;
}

@Override
public <Message> ActorRef<Message> getOrRegisterActor(final String name, Callable<ActorRef<Message>> actorFactory) throws SuspendExecution {
ActorRef<?> actor = registeredActors.get(name);
public <T extends ActorRef<?>> T getOrRegisterActor(final String name, Callable<T> actorFactory) throws SuspendExecution {
T actor = (T)registeredActors.get(name);
if (actor == null) {
lock.lock();
try {
actor = registeredActors.get(name);
actor = (T)registeredActors.get(name);
if (actor == null) {
try {
actor = actorFactory.call();
Expand All @@ -141,9 +139,9 @@ public <Message> ActorRef<Message> getOrRegisterActor(final String name, Callabl
lock.unlock();
}
}
return (ActorRef<Message>) actor;
return actor;
}

/**
* Use only in tests!
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
* Quasar: lightweight threads and actors for the JVM.
* Copyright (c) 2013-2014, Parallel Universe Software Co. All rights reserved.
* Copyright (c) 2013-2015, Parallel Universe Software Co. All rights reserved.
*
* This program and the accompanying materials are dual-licensed under
* either the terms of the Eclipse Public License v1.0 as published by
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,63 @@
*/
package co.paralleluniverse.actors.spi;

import co.paralleluniverse.actors.Actor;
import co.paralleluniverse.actors.ActorRef;
import co.paralleluniverse.fibers.SuspendExecution;
import java.lang.reflect.Method;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import static co.paralleluniverse.common.reflection.ReflectionUtil.accessible;
import java.lang.reflect.InvocationTargetException;

/**
*
* @author pron
*/
public interface ActorRegistry {
Object register(ActorRef<?> actor, Object globalId) throws SuspendExecution;
public abstract class ActorRegistry {
public abstract <Message> void register(Actor<Message, ?> actor, ActorRef<Message> actorRef) throws SuspendExecution;

void unregister(ActorRef<?> actor);
public abstract <Message> void unregister(Actor<Message, ?> actor, ActorRef<Message> actorRef);

<Message> ActorRef<Message> getActor(String name) throws InterruptedException, SuspendExecution;
public abstract ActorRef<?> getActor(String name) throws InterruptedException, SuspendExecution;

<Message> ActorRef<Message> getActor(String name, long timeout, TimeUnit unit) throws InterruptedException, SuspendExecution;
public abstract ActorRef<?> getActor(String name, long timeout, TimeUnit unit) throws InterruptedException, SuspendExecution;

<Message> ActorRef<Message> tryGetActor(String name) throws SuspendExecution;
public abstract ActorRef<?> tryGetActor(String name) throws SuspendExecution;

<Message> ActorRef<Message> getOrRegisterActor(String name, Callable<ActorRef<Message>> factory) throws SuspendExecution;
public abstract <T extends ActorRef<?>> T getOrRegisterActor(String name, Callable<T> factory) throws SuspendExecution;

void shutdown();
public abstract void shutdown();

protected Object getGlobalId(Actor<?, ?> actor) {
try {
return getGlobalId.invoke(actor);
} catch (InvocationTargetException e) {
throw new RuntimeException(e.getCause());
} catch (ReflectiveOperationException e) {
throw new AssertionError(e);
}
}

protected void setGlobalId(Actor<?, ?> actor, Object globalId) {
try {
setGlobalId.invoke(actor, globalId);
} catch (InvocationTargetException e) {
throw new RuntimeException(e.getCause());
} catch (ReflectiveOperationException e) {
throw new AssertionError(e);
}
}

private static final Method getGlobalId;
private static final Method setGlobalId;

static {
try {
getGlobalId = accessible(Actor.class.getDeclaredMethod("getGlobalId"));
setGlobalId = accessible(Actor.class.getDeclaredMethod("setGlobalId", Object.class));
} catch (ReflectiveOperationException e) {
throw new AssertionError(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
* Quasar: lightweight threads and actors for the JVM.
* Copyright (c) 2013-2014, Parallel Universe Software Co. All rights reserved.
* Copyright (c) 2013-2015, Parallel Universe Software Co. All rights reserved.
*
* This program and the accompanying materials are dual-licensed under
* either the terms of the Eclipse Public License v1.0 as published by
Expand All @@ -21,6 +21,28 @@
* @author pron
*/
public class Grid {
public static Grid getInstance() throws InterruptedException {
try {
return LazyHolder.instance;
} catch (RuntimeException e) {
if (e.getCause() instanceof InterruptedException)
throw (InterruptedException) e.getCause();
throw e;
}
}

private static class LazyHolder {
private static Grid instance;

static {
try {
instance = new Grid(co.paralleluniverse.galaxy.Grid.getInstance());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}

private final co.paralleluniverse.galaxy.Grid grid;
private final Store store;
private final Messenger messenger;
Expand All @@ -29,7 +51,7 @@ public class Grid {
RemoteInit.init();
}

public Grid(co.paralleluniverse.galaxy.Grid grid) {
private Grid(co.paralleluniverse.galaxy.Grid grid) {
this.grid = grid;
this.store = new StoreImpl(grid.store());
this.messenger = new MessengerImpl(grid.messenger());
Expand Down Expand Up @@ -68,4 +90,8 @@ public Cluster cluster() {
public void goOnline() throws InterruptedException {
grid.goOnline();
}

public co.paralleluniverse.galaxy.Grid getDelegate() {
return grid;
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
* Quasar: lightweight threads and actors for the JVM.
* Copyright (c) 2013-2014, Parallel Universe Software Co. All rights reserved.
* Copyright (c) 2013-2015, Parallel Universe Software Co. All rights reserved.
*
* This program and the accompanying materials are dual-licensed under
* either the terms of the Eclipse Public License v1.0 as published by
Expand Down Expand Up @@ -531,6 +531,13 @@ public interface Store {
*/
ItemState getState(long id);

/**
* Item version
*
* @param id The item's ID.
*/
long getVersion(long id);

/**
* Sends a message to an item, which will be received by {@link CacheListener#messageReceived(byte[]) CacheListener.messageReceived}
* on the item's owning node.
Expand Down
Loading

0 comments on commit 092ae88

Please sign in to comment.