Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.io.Serializable;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessage;
import org.jetbrains.annotations.Nullable;
Expand Down Expand Up @@ -93,8 +92,6 @@ public interface DiscoveryCustomMessage extends Serializable {
public boolean isMutable();

/**
* See {@link DiscoverySpiCustomMessage#stopProcess()}.
*
* @return {@code True} if message should not be sent to others nodes after it was processed on coordinator.
*/
public default boolean stopProcess() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@
import org.apache.ignite.spi.discovery.DiscoveryMetricsProvider;
import org.apache.ignite.spi.discovery.DiscoveryNotification;
import org.apache.ignite.spi.discovery.DiscoverySpi;
import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.apache.ignite.spi.discovery.DiscoverySpiDataExchange;
import org.apache.ignite.spi.discovery.DiscoverySpiHistorySupport;
import org.apache.ignite.spi.discovery.DiscoverySpiListener;
Expand Down Expand Up @@ -593,8 +592,8 @@ private void onDiscovery0(DiscoveryNotification notification) {
ClusterNode node = notification.getNode();
long topVer = notification.getTopVer();

DiscoveryCustomMessage customMsg = notification.getCustomMsgData() == null ? null
: ((CustomMessageWrapper)notification.getCustomMsgData()).delegate();
DiscoveryCustomMessage customMsg = U.unwrapCustomMessage(notification.customMessage() == null ?
null : notification.customMessage());

if (skipMessage(notification.type(), customMsg))
return;
Expand Down Expand Up @@ -933,7 +932,7 @@ public SecurityAwareNotificationTask(DiscoveryNotification notification) {

/** */
@Override public void run() {
DiscoverySpiCustomMessage customMsg = notification.getCustomMsgData();
DiscoveryCustomMessage customMsg = notification.customMessage();

if (customMsg instanceof SecurityAwareCustomMessageWrapper) {
UUID secSubjId = ((SecurityAwareCustomMessageWrapper)customMsg).securitySubjectId();
Expand Down Expand Up @@ -2336,7 +2335,7 @@ public void sendCustomEvent(DiscoveryCustomMessage msg) throws IgniteCheckedExce

getSpi().sendCustomEvent(security.enabled()
? new SecurityAwareCustomMessageWrapper(msg, security.securityContext().subject().id())
: new CustomMessageWrapper(msg));
: msg);
}
catch (IgniteClientDisconnectedException e) {
IgniteFuture<?> reconnectFut = ctx.cluster().clientReconnectFuture();
Expand Down Expand Up @@ -2941,7 +2940,7 @@ private static class NotificationEvent {
Collection<ClusterNode> topSnapshot;

/** Data. */
@Nullable DiscoveryCustomMessage data;
@Nullable DiscoveryCustomMessage customMsg;

/** Span container. */
SpanContainer spanContainer;
Expand All @@ -2955,7 +2954,7 @@ private static class NotificationEvent {
* @param node Node.
* @param discoCache Disco cache.
* @param topSnapshot Topology snapshot.
* @param data Data.
* @param customMsg Data.
* @param spanContainer Span container.
*/
public NotificationEvent(
Expand All @@ -2964,7 +2963,7 @@ public NotificationEvent(
ClusterNode node,
DiscoCache discoCache,
Collection<ClusterNode> topSnapshot,
@Nullable DiscoveryCustomMessage data,
@Nullable DiscoveryCustomMessage customMsg,
SpanContainer spanContainer,
SecurityContext secCtx
) {
Expand All @@ -2973,7 +2972,7 @@ public NotificationEvent(
this.node = node;
this.discoCache = discoCache;
this.topSnapshot = topSnapshot;
this.data = data;
this.customMsg = customMsg;
this.spanContainer = spanContainer;
this.secCtx = secCtx;
}
Expand Down Expand Up @@ -3072,7 +3071,7 @@ else if (type == EVT_CLIENT_NODE_RECONNECTED)
* @param notificationEvt Notification event.
*/
void addEvent(NotificationEvent notificationEvt) {
assert notificationEvt.node != null : notificationEvt.data;
assert notificationEvt.node != null : notificationEvt.customMsg;

if (notificationEvt.type == EVT_CLIENT_NODE_DISCONNECTED)
discoWrk.disconnectEvtFut = new GridFutureAdapter();
Expand Down Expand Up @@ -3228,11 +3227,11 @@ private void body0() throws InterruptedException {
customEvt.type(type);
customEvt.topologySnapshot(topVer.topologyVersion(), evt.topSnapshot);
customEvt.affinityTopologyVersion(topVer);
customEvt.customMessage(evt.data);
customEvt.customMessage(evt.customMsg);
customEvt.span(evt.spanContainer != null ? evt.spanContainer.span() : null);

if (evt.discoCache == null) {
assert discoCache != null : evt.data;
assert discoCache != null : evt.customMsg;

evt.discoCache = discoCache;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,20 @@
import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.jetbrains.annotations.Nullable;

/** Extends {@link CustomMessageWrapper} with ID of security subject that initiated the current message. */
public class SecurityAwareCustomMessageWrapper extends CustomMessageWrapper {
/** Custom message wrapper with ID of security subject that initiated the current message. */
public class SecurityAwareCustomMessageWrapper extends DiscoverySpiCustomMessage {
/** */
private static final long serialVersionUID = 0L;

/** Security subject ID. */
private final UUID secSubjId;

/** Original message. */
private final DiscoveryCustomMessage delegate;

/** */
public SecurityAwareCustomMessageWrapper(DiscoveryCustomMessage delegate, UUID secSubjId) {
super(delegate);

this.delegate = delegate;
this.secSubjId = secSubjId;
}

Expand All @@ -42,8 +44,25 @@ public UUID securitySubjectId() {
}

/** {@inheritDoc} */
@Override public @Nullable DiscoverySpiCustomMessage ackMessage() {
DiscoveryCustomMessage ack = delegate().ackMessage();
@Override public boolean isMutable() {
return delegate.isMutable();
}

/** {@inheritDoc} */
@Override public boolean stopProcess() {
return delegate.stopProcess();
}

/**
* @return Delegate.
*/
public DiscoveryCustomMessage delegate() {
return delegate;
}

/** {@inheritDoc} */
@Override public @Nullable DiscoveryCustomMessage ackMessage() {
DiscoveryCustomMessage ack = delegate.ackMessage();

return ack == null ? null : new SecurityAwareCustomMessageWrapper(ack, secSubjId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@
import java.util.UUID;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.spi.IgniteSpiAdapter;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.IgniteSpiNoop;
import org.apache.ignite.spi.discovery.DiscoveryMetricsProvider;
import org.apache.ignite.spi.discovery.DiscoverySpi;
import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.apache.ignite.spi.discovery.DiscoverySpiDataExchange;
import org.apache.ignite.spi.discovery.DiscoverySpiListener;
import org.apache.ignite.spi.discovery.DiscoverySpiNodeAuthenticator;
Expand Down Expand Up @@ -101,7 +101,7 @@ public class StandaloneNoopDiscoverySpi extends IgniteSpiAdapter implements Disc
}

/** {@inheritDoc} */
@Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) throws IgniteException {
@Override public void sendCustomEvent(DiscoveryCustomMessage msg) throws IgniteException {

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,9 @@
import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.managers.deployment.GridDeployment;
import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
import org.apache.ignite.internal.managers.discovery.SecurityAwareCustomMessageWrapper;
import org.apache.ignite.internal.mxbean.IgniteStandardMXBean;
import org.apache.ignite.internal.processors.cache.CacheDefaultBinaryAffinityKeyMapper;
import org.apache.ignite.internal.processors.cache.CacheObjectContext;
Expand Down Expand Up @@ -8305,4 +8307,14 @@ public static <T extends IgniteDataTransferObject> IgniteDataTransferObjectSeria
return (IgniteDataTransferObjectSerializer<T>)EMPTY_DTO_SERIALIZER;
}
}

/**
* Unwraps messsage if it is wrapped by {@link SecurityAwareCustomMessageWrapper}.
*
* @param msg Message.
*/
public static DiscoveryCustomMessage unwrapCustomMessage(DiscoveryCustomMessage msg) {
return msg instanceof SecurityAwareCustomMessageWrapper ?
((SecurityAwareCustomMessageWrapper)msg).delegate() : msg;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.Collection;
import java.util.NavigableMap;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.processors.tracing.messages.SpanContainer;
import org.jetbrains.annotations.Nullable;

Expand All @@ -41,8 +42,8 @@ public class DiscoveryNotification {
/** Topology history. */
private @Nullable NavigableMap<Long, Collection<ClusterNode>> topHist;

/** Custom message data. */
private @Nullable DiscoverySpiCustomMessage customMsgData;
/** Custom message. */
private @Nullable DiscoveryCustomMessage customMsg;

/** Span container. */
private SpanContainer spanContainer;
Expand All @@ -66,7 +67,7 @@ public DiscoveryNotification(int eventType, long topVer, ClusterNode node, Colle
* @param node Node.
* @param topSnapshot Topology snapshot.
* @param topHist Topology history.
* @param customMsgData Custom message data.
* @param customMsg Custom message.
* @param spanContainer Span container.
*/
public DiscoveryNotification(
Expand All @@ -75,15 +76,15 @@ public DiscoveryNotification(
ClusterNode node,
Collection<ClusterNode> topSnapshot,
@Nullable NavigableMap<Long, Collection<ClusterNode>> topHist,
@Nullable DiscoverySpiCustomMessage customMsgData,
@Nullable DiscoveryCustomMessage customMsg,
SpanContainer spanContainer
) {
this.eventType = eventType;
this.topVer = topVer;
this.node = node;
this.topSnapshot = topSnapshot;
this.topHist = topHist;
this.customMsgData = customMsgData;
this.customMsg = customMsg;
this.spanContainer = spanContainer;
}

Expand Down Expand Up @@ -123,10 +124,10 @@ public NavigableMap<Long, Collection<ClusterNode>> getTopHist() {
}

/**
* @return Custom message data.
* @return Custom message.
*/
public DiscoverySpiCustomMessage getCustomMsgData() {
return customMsgData;
public DiscoveryCustomMessage customMessage() {
return customMsg;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.UUID;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.spi.IgniteSpi;
import org.apache.ignite.spi.IgniteSpiException;
Expand Down Expand Up @@ -153,7 +154,7 @@ public interface DiscoverySpi extends IgniteSpi {
* @param msg Custom message.
* @throws IgniteException if failed to sent the event message.
*/
public void sendCustomEvent(DiscoverySpiCustomMessage msg) throws IgniteException;
public void sendCustomEvent(DiscoveryCustomMessage msg) throws IgniteException;

/**
* Initiates failure of provided node.
Expand Down
Loading
Loading