Class Actor<Message,V>
- java.lang.Object
-
- co.paralleluniverse.actors.ActorImpl<Message>
-
- co.paralleluniverse.actors.Actor<Message,V>
-
- Type Parameters:
Message
- The message type the actor can receive. It is oftenObject
.V
- The actor's return value type. UseVoid
if the actor does not return a result.
- All Implemented Interfaces:
ActorBuilder<Message,V>
,Joinable<V>
,Port<Message>
,PortAutoCloseable
,ReceivePort<Message>
,Stranded
,SuspendableCallable<V>
,java.io.Serializable
,java.lang.AutoCloseable
- Direct Known Subclasses:
BasicActor
,BehaviorActor
public abstract class Actor<Message,V> extends ActorImpl<Message> implements SuspendableCallable<V>, ActorBuilder<Message,V>, Joinable<V>, Stranded, ReceivePort<Message>
An actor is a self-contained execution unit - an object running in its own strand and communicating with other actors via messages. An actor has a channel used as a mailbox, and can be monitored for errors.- See Also:
- Serialized Form
-
-
Nested Class Summary
-
Nested classes/interfaces inherited from class co.paralleluniverse.actors.ActorImpl
ActorImpl.ActorLifecycleListener
-
Nested classes/interfaces inherited from interface co.paralleluniverse.strands.channels.ReceivePort
ReceivePort.EOFException
-
-
Field Summary
-
Fields inherited from class co.paralleluniverse.actors.ActorImpl
flightRecorder, ref
-
-
Constructor Summary
Constructors Modifier Constructor Description protected
Actor()
This constructor must only be called by hot code-swap actors, and never, ever, called by application code.protected
Actor(Strand strand, java.lang.String name, MailboxConfig mailboxConfig)
Creates a new actor.Actor(java.lang.String name, MailboxConfig mailboxConfig)
Creates a new actor.
-
Method Summary
All Methods Static Methods Instance Methods Abstract Methods Concrete Methods Deprecated Methods Modifier and Type Method Description protected void
addLifecycleListener(co.paralleluniverse.actors.LifecycleListener listener)
Actor<Message,V>
build()
Constructs a new actorprotected void
checkCodeSwap()
Tests whether this actor has been upgraded via hot code-swapping.void
checkThrownIn()
Tests whether an exception has beenthrown into
this actor, and if so, throws it.void
close()
Closes the channel so that no more messages could be sent to it.static <M,V>
Actor<M,V>currentActor()
Returns the actor currently running in the current strand.protected abstract V
doRun()
An actor must implement this method, which contains the actor's logic.protected Message
filterMessage(java.lang.Object m)
All messages received from the mailbox are passed to this method.V
get()
V
get(long timeout, java.util.concurrent.TimeUnit unit)
static Actor
getActor(Strand s)
Returns the actor associated with the given strand, ornull
if none is.protected java.lang.Throwable
getDeathCause()
Returns this actor's cause of deathco.paralleluniverse.actors.ActorMonitor
getMonitor()
java.lang.String
getName()
Returns this actor's name.int
getQueueLength()
Returns the number of messages currently waiting in the mailbox.Strand
getStrand()
protected Message
handleLifecycleMessage(LifecycleMessage m)
This method is called by this class during a call to any of thereceive
methods if aLifecycleMessage
is found in the mailbox.static <M> ActorRef<M>
hire(ActorRef<M> ref)
Hires and resumes/restarts a migrated actor.static <M> ActorRef<M>
hire(ActorRef<M> ref, FiberScheduler scheduler)
Hires and resumes/restarts a migrated actor.protected void
internalSend(java.lang.Object message)
For internal useprotected void
internalSendNonSuspendable(java.lang.Object message)
protected void
interrupt()
Interrupts the actor's strand.boolean
isClosed()
Tests whether the channel has been closed and no more messages await in the channel.boolean
isDone()
Tests whether this actor has terminated.protected boolean
isInActor()
Tests whether this code is executing in this actor's strand.boolean
isRegistered()
Tests whether this actor has beenregistered
.boolean
isStarted()
Tests whether this actor has been started, i.e.void
join()
void
join(long timeout, java.util.concurrent.TimeUnit unit)
Actor
link(ActorRef other)
Links this actor to another.protected void
linked(ActorRef actor)
protected Mailbox<java.lang.Object>
mailbox()
Returns this actor's mailbox channel.protected ActorRef<Message>
makeRef(ActorRef<Message> ref)
void
migrate()
Suspends and migrate the actor.void
migrateAndRestart()
Suspends and migrates the actor in such a way that when it is later hired, the actor is restarted (i.e., its `doRun` method will be called again and run from the top), but the current value of the actor's fields will be preserved.co.paralleluniverse.actors.ActorMonitor
monitor()
Starts a monitor that exposes information about this actor via a JMX MBean.protected void
monitorAddDeath(java.lang.Object reason)
protected void
monitorAddMessage()
protected void
monitorResetSkippedMessages()
protected void
monitorSkippedMessage()
static <T extends Actor<Message,V>,Message,V>
TnewActor(ActorSpec<T,Message,V> spec)
Creates a new actor from anActorSpec
.static <T extends Actor<Message,V>,Message,V>
TnewActor(java.lang.Class<T> clazz, java.lang.Object... params)
Creates a new actor.protected void
onCodeChange()
This method is called on an actor instance replacing an active instance via hot code-swapping.protected java.lang.Object
readResolve()
Message
receive()
Returns the next message from the mailbox.Message
receive(long timeout, java.util.concurrent.TimeUnit unit)
Returns the next message from the mailbox.Message
receive(Timeout timeout)
Returns the next message from the mailbox.ActorRef<Message>
ref()
Returns the ActorRef to this actor, if it has been started.Actor
register()
Registers this actor in the actor registry under its name.Actor<Message,V>
register(java.lang.String name)
Registers this actor in the actor registry under the given name and sets this actor's name.protected Actor<Message,V>
reinstantiate()
Returns a "clone" of this actor, used by asupervisor
to restart this actor if it dies.protected void
removeLifecycleListener(co.paralleluniverse.actors.LifecycleListener listener)
protected void
removeObserverListeners(ActorRef actor)
V
run()
protected ActorRef<Message>
self()
Returns the ActorRef to this actor, if it has been started.protected void
sendSync(Message message)
Actor<Message,V>
setForwardWatch(boolean value)
Deprecated.void
setMonitor(co.paralleluniverse.actors.ActorMonitor monitor)
Sets the actor's monitorvoid
setName(java.lang.String name)
Sets this actor's name.void
setStrand(Strand strand)
ActorRef<Message>
spawn()
Starts a new fiber and runs the actor in it.ActorRef<Message>
spawn(FiberFactory ff)
Starts a new fiber using the given scheduler and runs the actor in it.ActorRef<Message>
spawn(StrandFactory sf)
Starts a new fiber using the given scheduler and runs the actor in it.ActorRef<Message>
spawnThread()
Starts a new thread and runs the actor in it.Actor<Message,V>
start()
void
stopMonitor()
Shuts down the actor's monitor.void
throwIn(java.lang.RuntimeException e)
java.lang.String
toString()
Message
tryReceive()
Retrieves a message from the mailbox if one is available.protected boolean
trySend(Message message)
Actor
unlink(ActorRef other)
Un-links this actor from another.protected void
unlinked(ActorRef actor)
Actor
unregister()
Unregisters this actor from the actor registry.void
unwatch(ActorRef other, java.lang.Object watchId)
Un-watches another actor.protected void
verifyInActor()
Tests whether this code is executing in this actor's strand, and throws aConcurrencyException
if not.protected void
verifyOnActorStrand()
java.lang.Object
watch(ActorRef other)
Makes this actor watch another actor.protected java.lang.Object
writeReplace()
-
Methods inherited from class co.paralleluniverse.actors.ActorImpl
close, getLifecycleListener, getMailbox, isRecordingLevel, record, record, record, record, record, record, record, sendOrInterrupt
-
-
-
-
Constructor Detail
-
Actor
public Actor(java.lang.String name, MailboxConfig mailboxConfig)
Creates a new actor.- Parameters:
name
- the actor name (may benull
).mailboxConfig
- the actor's mailbox settings; ifnull
, the default config - unbounded mailbox - will be used.
-
Actor
protected Actor()
This constructor must only be called by hot code-swap actors, and never, ever, called by application code.
-
Actor
protected Actor(Strand strand, java.lang.String name, MailboxConfig mailboxConfig)
Creates a new actor.- Parameters:
strand
- the actor's strandname
- the actor name (may benull
).mailboxConfig
- the actor's mailbox settings; ifnull
, the default config - unbounded mailbox - will be used.
-
-
Method Detail
-
newActor
public static <T extends Actor<Message,V>,Message,V> T newActor(java.lang.Class<T> clazz, java.lang.Object... params)
Creates a new actor. The actor must have a public constructor that can take the given parameters.- Type Parameters:
T
- The actor's typeMessage
- The actor's message type.V
- The actor's return value type.- Parameters:
clazz
- The actor's classparams
- Parameters that will be passed to the actor class's constructor in order to construct a new instance.- Returns:
- A new actor of type T.
-
newActor
public static <T extends Actor<Message,V>,Message,V> T newActor(ActorSpec<T,Message,V> spec)
Creates a new actor from anActorSpec
.- Type Parameters:
T
- The actor's typeMessage
- The actor's message type.V
- The actor's return value type.- Parameters:
spec
- The ActorSpec that defines how to build the actor.- Returns:
- A new actor of type T.
-
setForwardWatch
@Deprecated public Actor<Message,V> setForwardWatch(boolean value)
Deprecated.For use by non-Java, untyped languages only.If set to
true
,LifecycleMessage
will, by default, returnExitMessage
s fromwatched
actors to be returned byreceive
. This means thatreceive
will return a message of a type that may not beMessage
, and therefore this value should only be set to true in untyped languages.- Parameters:
value
-- Returns:
-
getName
public java.lang.String getName()
Returns this actor's name.
-
setName
public void setName(java.lang.String name)
Sets this actor's name. The name does not have to be unique, and may benull
-
spawn
public ActorRef<Message> spawn(StrandFactory sf)
Starts a new fiber using the given scheduler and runs the actor in it. The fiber's name will be set to this actor's name.
-
spawn
public ActorRef<Message> spawn(FiberFactory ff)
Starts a new fiber using the given scheduler and runs the actor in it. The fiber's name will be set to this actor's name.
-
spawn
public ActorRef<Message> spawn()
Starts a new fiber and runs the actor in it. The fiber's name will be set to this actor's name.- Returns:
- This actors' ActorRef
-
spawnThread
public ActorRef<Message> spawnThread()
Starts a new thread and runs the actor in it. The fiber's name will be set to this actor's name.- Returns:
- This actors' ActorRef
-
run
public final V run() throws java.lang.InterruptedException, SuspendExecution
- Specified by:
run
in interfaceSuspendableCallable<Message>
- Throws:
java.lang.InterruptedException
SuspendExecution
-
reinstantiate
protected Actor<Message,V> reinstantiate()
Returns a "clone" of this actor, used by asupervisor
to restart this actor if it dies.If this actor is supervised by a
supervisor
and was not created with thenewActor
factory method, then this method should be overridden.- Returns:
- A new LocalActor instance that's a clone of this.
-
interrupt
protected final void interrupt()
Interrupts the actor's strand.
-
currentActor
public static <M,V> Actor<M,V> currentActor()
Returns the actor currently running in the current strand.
-
self
protected ActorRef<Message> self()
Returns the ActorRef to this actor, if it has been started.- Returns:
- the ActorRef of this actor if it has been started, or
null
otherwise.
-
getQueueLength
public final int getQueueLength()
Returns the number of messages currently waiting in the mailbox.
-
mailbox
protected final Mailbox<java.lang.Object> mailbox()
Returns this actor's mailbox channel.
-
internalSend
protected void internalSend(java.lang.Object message)
Description copied from class:ActorImpl
For internal use- Specified by:
internalSend
in classActorImpl<Message>
-
internalSendNonSuspendable
protected void internalSendNonSuspendable(java.lang.Object message)
- Specified by:
internalSendNonSuspendable
in classActorImpl<Message>
-
sendSync
protected final void sendSync(Message message) throws SuspendExecution
- Overrides:
sendSync
in classActorImpl<Message>
- Throws:
SuspendExecution
-
trySend
protected final boolean trySend(Message message)
-
receive
public final Message receive() throws SuspendExecution, java.lang.InterruptedException
Returns the next message from the mailbox. If no message is currently available, this method blocks until a message arrives.- Specified by:
receive
in interfaceReceivePort<Message>
- Returns:
- a message sent to this actor.
- Throws:
java.lang.InterruptedException
SuspendExecution
-
receive
public final Message receive(long timeout, java.util.concurrent.TimeUnit unit) throws SuspendExecution, java.lang.InterruptedException
Returns the next message from the mailbox. If no message is currently available, this method blocks until a message arrives, but no longer than the given timeout.- Specified by:
receive
in interfaceReceivePort<Message>
- Parameters:
timeout
- the maximum duration to block waiting for a message.unit
- the time unit of the timeout.- Returns:
- a message sent to this actor, or
null
if the timeout has expired. - Throws:
java.lang.InterruptedException
SuspendExecution
-
receive
public final Message receive(Timeout timeout) throws SuspendExecution, java.lang.InterruptedException
Returns the next message from the mailbox. If no message is currently available, this method blocks until a message arrives, but no longer than the given timeout.- Specified by:
receive
in interfaceReceivePort<Message>
- Parameters:
timeout
- the method will not block for longer than the amount remaining in theTimeout
- Returns:
- a message sent to this actor, or
null
if the timeout has expired. - Throws:
java.lang.InterruptedException
SuspendExecution
-
tryReceive
public final Message tryReceive()
Retrieves a message from the mailbox if one is available. This method never blocks.- Specified by:
tryReceive
in interfaceReceivePort<Message>
- Returns:
- a message, or
null
if one is not immediately available.
-
filterMessage
protected Message filterMessage(java.lang.Object m)
All messages received from the mailbox are passed to this method. If this method returns a non-null value, this value will be returned from thereceive
methods. If it returnsnull
, thenreceive
will keep waiting.By default, this message passes all
LifecycleMessage
messages tohandleLifecycleMessage
, while other messages are returned (and will be returned byreceive
.- Parameters:
m
- the message
-
isClosed
public final boolean isClosed()
Description copied from interface:PortAutoCloseable
Tests whether the channel has been closed and no more messages await in the channel. If this method returnstrue
all future calls toreceive
are guaranteed to returnnull
, and calls toreceive
on a primitive channel will throw aEOFException
.- Specified by:
isClosed
in interfacePortAutoCloseable
- Returns:
true
if the channels has been closed and no more messages will be received;false
otherwise.
-
close
public void close()
Description copied from interface:PortAutoCloseable
Closes the channel so that no more messages could be sent to it. Messages already sent to the channel will still be received.- Specified by:
close
in interfacejava.lang.AutoCloseable
- Specified by:
close
in interfacePortAutoCloseable
- Overrides:
close
in classActorImpl<Message>
-
get
public final V get() throws java.lang.InterruptedException, java.util.concurrent.ExecutionException
-
get
public final V get(long timeout, java.util.concurrent.TimeUnit unit) throws java.lang.InterruptedException, java.util.concurrent.ExecutionException, java.util.concurrent.TimeoutException
-
join
@Suspendable public final void join() throws java.util.concurrent.ExecutionException, java.lang.InterruptedException
-
join
@Suspendable public final void join(long timeout, java.util.concurrent.TimeUnit unit) throws java.util.concurrent.ExecutionException, java.lang.InterruptedException, java.util.concurrent.TimeoutException
-
isStarted
public final boolean isStarted()
Tests whether this actor has been started, i.e. whether the strand executing it has been started.
-
isDone
public final boolean isDone()
Tests whether this actor has terminated.
-
verifyInActor
protected final void verifyInActor()
Tests whether this code is executing in this actor's strand, and throws aConcurrencyException
if not.- See Also:
isInActor()
-
verifyOnActorStrand
protected final void verifyOnActorStrand()
-
isInActor
protected final boolean isInActor()
Tests whether this code is executing in this actor's strand.
-
getActor
public static Actor getActor(Strand s)
Returns the actor associated with the given strand, ornull
if none is.
-
doRun
protected abstract V doRun() throws java.lang.InterruptedException, SuspendExecution
An actor must implement this method, which contains the actor's logic. This method begins executing on the actor's strand.Upon a hot code-swap, this method is re-executed, so it is this method's responsibility to check this actor's state (which may not be blank after a code-swap) when it begins.
- Returns:
- The actor's return value, which can be obtained with
get()
. - Throws:
java.lang.InterruptedException
SuspendExecution
-
handleLifecycleMessage
protected Message handleLifecycleMessage(LifecycleMessage m)
This method is called by this class during a call to any of thereceive
methods if aLifecycleMessage
is found in the mailbox. By default, if the message is anExitMessage
and itswatch
isnull
, i.e. it's a result of alink
rather than awatch
, it will throw aLifecycleException
, which will, in turn, cause this exception to be thrown by the call toreceive
. This method is not allowed to block. If you want to block as a result of a lifecycle message, return the message from this method (rather than returningnull
), and have it processed by the caller toreceive
.- Parameters:
m
- the message- Returns:
null
if the message has been processed and should not be returned byreceive
-
checkCodeSwap
protected void checkCodeSwap() throws SuspendExecution
Tests whether this actor has been upgraded via hot code-swapping. If a new version of this actor is found, this method never returns (a specialError
is thrown which causes the actor to restart).- Throws:
SuspendExecution
-
onCodeChange
protected void onCodeChange()
This method is called on an actor instance replacing an active instance via hot code-swapping. When this method is called, the fields of the old instance have been shalZlow-copied to this instance, but this instance has not yet started to run. This method should initialize any relevant state not copied from the old instance.
-
addLifecycleListener
protected final void addLifecycleListener(co.paralleluniverse.actors.LifecycleListener listener)
- Specified by:
addLifecycleListener
in classActorImpl<Message>
-
removeLifecycleListener
protected void removeLifecycleListener(co.paralleluniverse.actors.LifecycleListener listener)
- Specified by:
removeLifecycleListener
in classActorImpl<Message>
-
removeObserverListeners
protected void removeObserverListeners(ActorRef actor)
- Specified by:
removeObserverListeners
in classActorImpl<Message>
-
getDeathCause
protected final java.lang.Throwable getDeathCause()
Returns this actor's cause of death- Returns:
- the
Throwable
that caused this actor's death, ornull
if it died by natural causes, or if it not dead.
-
isRegistered
public final boolean isRegistered()
Tests whether this actor has beenregistered
.- Returns:
true
if the actor is registered;false
otherwise.
-
throwIn
public final void throwIn(java.lang.RuntimeException e)
-
checkThrownIn
public final void checkThrownIn()
Tests whether an exception has beenthrown into
this actor, and if so, throws it. This method must only be called within the actor's strand.
-
link
public final Actor link(ActorRef other)
Links this actor to another. A link is symmetrical. When two actors are linked and one of them dies, the other receives anExitMessage
, that is handled byhandleLifecycleMessage
, which, be default, throws aLifecycleException
as a response. The exception will be thrown by any of thereceive
methods.- Parameters:
other
- the other actor- Returns:
this
- See Also:
watch(ActorRef)
,unlink(ActorRef)
-
unlink
public final Actor unlink(ActorRef other)
Un-links this actor from another. This operation is symmetric.- Parameters:
other
- the other actor- Returns:
this
- See Also:
link(ActorRef)
-
unlinked
protected void unlinked(ActorRef actor)
-
watch
public final java.lang.Object watch(ActorRef other)
Makes this actor watch another actor. When the other actor dies, this actor receives anExitMessage
, that is handled byhandleLifecycleMessage
. This message does not cause an exception to be thrown, unlike the case where it is received as a result of a linked actor's death.Unlike a link, a watch is asymmetric, and it is also composable, namely, calling this method twice with the same argument would result in two different values returned, and in an
ExitMessage
to be received twice.- Parameters:
other
- the other actor- Returns:
- a
watchId
object that identifies this watch in messages, and used to remove the watch by theunwatch
method. - See Also:
link(ActorRef)
,unwatch(ActorRef, Object)
-
unwatch
public final void unwatch(ActorRef other, java.lang.Object watchId)
Un-watches another actor.- Parameters:
other
- the other actorwatchId
- the object returned from the call towatch(other)
- See Also:
watch(ActorRef)
-
register
public final Actor<Message,V> register(java.lang.String name) throws SuspendExecution
Registers this actor in the actor registry under the given name and sets this actor's name. This also creates amonitor
for this actor.- Parameters:
name
- the name of the actor in the registry, must be equal to theactor's name
if it has one.- Returns:
this
- Throws:
SuspendExecution
-
register
public final Actor register() throws SuspendExecution
Registers this actor in the actor registry under its name. This also creates amonitor
for this actor.- Returns:
this
- Throws:
SuspendExecution
-
unregister
public final Actor unregister()
Unregisters this actor from the actor registry.- Returns:
this
-
migrateAndRestart
public void migrateAndRestart() throws SuspendExecution
Suspends and migrates the actor in such a way that when it is later hired, the actor is restarted (i.e., its `doRun` method will be called again and run from the top), but the current value of the actor's fields will be preserved. This method never returns.- Throws:
SuspendExecution
-
migrate
public void migrate() throws SuspendExecution
Suspends and migrate the actor. This method suspends the fiber the actor is running in (and is therefore available only for actors running in fibers), so that when the actor is hired, it will continue execution from the point this method was called. This method must be called on a fiber.- Throws:
SuspendExecution
-
hire
public static <M> ActorRef<M> hire(ActorRef<M> ref) throws SuspendExecution
Hires and resumes/restarts a migrated actor.- Parameters:
ref
- theActorRef
of the migrated actor.- Returns:
- the ref
- Throws:
SuspendExecution
-
hire
public static <M> ActorRef<M> hire(ActorRef<M> ref, FiberScheduler scheduler) throws SuspendExecution
Hires and resumes/restarts a migrated actor.- Parameters:
ref
- theActorRef
of the migrated actor.scheduler
- theFiberScheduler
on which to schedule this actor, ornull
to schedule the actor on a thread.- Returns:
- the ref
- Throws:
SuspendExecution
-
build
public final Actor<Message,V> build() throws SuspendExecution
Description copied from interface:ActorBuilder
Constructs a new actor- Specified by:
build
in interfaceActorBuilder<Message,V>
- Returns:
- a newly created actor.
- Throws:
SuspendExecution
-
writeReplace
protected final java.lang.Object writeReplace() throws java.io.ObjectStreamException
- Throws:
java.io.ObjectStreamException
-
readResolve
protected java.lang.Object readResolve() throws java.io.ObjectStreamException
- Throws:
java.io.ObjectStreamException
-
monitor
public final co.paralleluniverse.actors.ActorMonitor monitor()
Starts a monitor that exposes information about this actor via a JMX MBean.- Returns:
- the monitor
-
setMonitor
public final void setMonitor(co.paralleluniverse.actors.ActorMonitor monitor)
Sets the actor's monitor- Parameters:
monitor
- the monitor
-
stopMonitor
public final void stopMonitor()
Shuts down the actor's monitor.
-
getMonitor
public final co.paralleluniverse.actors.ActorMonitor getMonitor()
-
monitorAddDeath
protected final void monitorAddDeath(java.lang.Object reason)
-
monitorAddMessage
protected final void monitorAddMessage()
-
monitorSkippedMessage
protected final void monitorSkippedMessage()
-
monitorResetSkippedMessages
protected final void monitorResetSkippedMessages()
-
-