001package org.apache.commons.jcs3.engine;
002
003/*
004 * Licensed to the Apache Software Foundation (ASF) under one
005 * or more contributor license agreements.  See the NOTICE file
006 * distributed with this work for additional information
007 * regarding copyright ownership.  The ASF licenses this file
008 * to you under the Apache License, Version 2.0 (the
009 * "License"); you may not use this file except in compliance
010 * with the License.  You may obtain a copy of the License at
011 *
012 *   http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing,
015 * software distributed under the License is distributed on an
016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
017 * KIND, either express or implied.  See the License for the
018 * specific language governing permissions and limitations
019 * under the License.
020 */
021
022import java.io.IOException;
023import java.util.Collections;
024import java.util.Set;
025import java.util.concurrent.ConcurrentHashMap;
026import java.util.concurrent.ConcurrentMap;
027import java.util.concurrent.CopyOnWriteArraySet;
028
029import org.apache.commons.jcs3.engine.behavior.ICacheListener;
030import org.apache.commons.jcs3.engine.behavior.ICacheObserver;
031import org.apache.commons.jcs3.log.Log;
032import org.apache.commons.jcs3.log.LogManager;
033
034/**
035 * Intercepts the requests to the underlying ICacheObserver object so that the listeners can be
036 * recorded locally for remote connection recovery purposes. (Durable subscription like those in JMS
037 * is not implemented at this stage for it can be too expensive.)
038 */
039public class CacheWatchRepairable
040    implements ICacheObserver
041{
042    /** The logger */
043    private static final Log log = LogManager.getLog( CacheWatchRepairable.class );
044
045    /** the underlying ICacheObserver. */
046    private ICacheObserver cacheWatch;
047
048    /** Map of cache regions. */
049    private final ConcurrentMap<String, Set<ICacheListener<?, ?>>> cacheMap =
050        new ConcurrentHashMap<>();
051
052    /**
053     * Replaces the underlying cache watch service and re-attaches all existing listeners to the new
054     * cache watch.
055     * <p>
056     * @param cacheWatch The new cacheWatch value
057     */
058    public void setCacheWatch( final ICacheObserver cacheWatch )
059    {
060        this.cacheWatch = cacheWatch;
061        cacheMap.forEach((cacheName, value) -> value.forEach(listener -> {
062                try
063                {
064                    log.info( "Adding listener to cache watch. ICacheListener = "
065                            + "{0} | ICacheObserver = {1}", listener, cacheWatch );
066                    cacheWatch.addCacheListener( cacheName, listener );
067                }
068                catch ( final IOException ex )
069                {
070                    log.error( "Problem adding listener. ICacheListener = {0} | "
071                            + "ICacheObserver = {1}", listener, cacheWatch, ex );
072                }
073        }));
074    }
075
076    /**
077     * Adds a feature to the CacheListener attribute of the CacheWatchRepairable object
078     * <p>
079     * @param cacheName The feature to be added to the CacheListener attribute
080     * @param obj The feature to be added to the CacheListener attribute
081     * @throws IOException
082     */
083    @Override
084    public <K, V> void addCacheListener( final String cacheName, final ICacheListener<K, V> obj )
085        throws IOException
086    {
087        // Record the added cache listener locally, regardless of whether the
088        // remote add-listener operation succeeds or fails.
089        cacheMap.computeIfAbsent(cacheName, key -> new CopyOnWriteArraySet<>(Collections.singletonList(obj)));
090
091        log.info( "Adding listener to cache watch. ICacheListener = {0} | "
092                + "ICacheObserver = {1} | cacheName = {2}", obj, cacheWatch,
093                cacheName );
094        cacheWatch.addCacheListener( cacheName, obj );
095    }
096
097    /**
098     * Adds a feature to the CacheListener attribute of the CacheWatchRepairable object
099     * <p>
100     * @param obj The feature to be added to the CacheListener attribute
101     * @throws IOException
102     */
103    @Override
104    public <K, V> void addCacheListener( final ICacheListener<K, V> obj )
105        throws IOException
106    {
107        // Record the added cache listener locally, regardless of whether the
108        // remote add-listener operation succeeds or fails.
109        cacheMap.values().forEach(set -> set.add(obj));
110
111        log.info( "Adding listener to cache watch. ICacheListener = {0} | "
112                + "ICacheObserver = {1}", obj, cacheWatch );
113        cacheWatch.addCacheListener( obj );
114    }
115
116    /**
117     * Tell the server to release us.
118     * <p>
119     * @param cacheName
120     * @param obj
121     * @throws IOException
122     */
123    @Override
124    public <K, V> void removeCacheListener( final String cacheName, final ICacheListener<K, V> obj )
125        throws IOException
126    {
127        log.info( "removeCacheListener, cacheName [{0}]", cacheName );
128        // Record the removal locally, regardless of whether the remote
129        // remove-listener operation succeeds or fails.
130        final Set<ICacheListener<?, ?>> listenerSet = cacheMap.get( cacheName );
131        if ( listenerSet != null )
132        {
133            listenerSet.remove( obj );
134        }
135        cacheWatch.removeCacheListener( cacheName, obj );
136    }
137
138    /**
139     * @param obj
140     * @throws IOException
141     */
142    @Override
143    public <K, V> void removeCacheListener( final ICacheListener<K, V> obj )
144        throws IOException
145    {
146        log.info( "removeCacheListener, ICacheListener [{0}]", obj );
147
148        // Record the removal locally, regardless of whether the remote
149        // remove-listener operation succeeds or fails.
150        cacheMap.values().forEach(set -> {
151            log.debug("Before removing [{0}] the listenerSet = {1}", obj, set);
152            set.remove( obj );
153        });
154        cacheWatch.removeCacheListener( obj );
155    }
156}