code.onehippo.org is currently readonly. We are migrating to code.bloomreach.com, please continue working there on Monday 14/12. See: https://docs.bloomreach.com/display/engineering/GitLab

Commit 589c19b8 authored by Ard Schrijvers's avatar Ard Schrijvers

REPO-1811 Improved destroy of the LockManager and other improvements

parent 5d434d43
......@@ -15,6 +15,7 @@
*/
package org.onehippo.repository.lock;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.HashMap;
import java.util.Iterator;
......@@ -44,7 +45,8 @@ public abstract class AbstractLockManager implements InternalLockManager {
private long longestIntervalSeconds = DEFAULT_SCHEDULED_JOBS_INTERVAL_SECONDS;
private boolean destroyed = false;
private volatile boolean destroyed = false;
private boolean destroyInProgress = false;
protected abstract Logger getLogger();
......@@ -59,7 +61,7 @@ public abstract class AbstractLockManager implements InternalLockManager {
protected abstract List<Lock> retrieveLocks() throws LockManagerException;
public AbstractLockManager() {
scheduledExecutorService = Executors.newScheduledThreadPool(1);
scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
}
protected void addJob(final Runnable runnable) {
......@@ -89,7 +91,7 @@ public abstract class AbstractLockManager implements InternalLockManager {
final Thread lockThread = lock.getThread().get();
if (lockThread == null || !lockThread.isAlive()) {
getLogger().warn("Thread '{}' that created lock for '{}' has stopped without releasing the lock. Thread '{}' " +
"now gets the lock", lock.getLockOwner(), key, Thread.currentThread().getName());
"now gets the lock", lock.getLockThread(), key, Thread.currentThread().getName());
unlock(key);
localLocks.put(key, createLock(key, Thread.currentThread().getName(), refreshRateSeconds));
return;
......@@ -110,15 +112,15 @@ public abstract class AbstractLockManager implements InternalLockManager {
final MutableLock lock = localLocks.get(key);
if (lock == null) {
getLogger().error("Lock '{}' does not exist or this cluster node does not contain the lock hence a thread from " +
"this JVM cannot unlock it", key);
"this JVM cannot unlock it", key);
return;
}
final Thread lockThread = lock.getThread().get();
if (lockThread == null || !lockThread.isAlive()) {
getLogger().error("Thread '{}' that created lock for '{}' has stopped without releasing the lock. The Thread " +
"should had invoked #unlock. Removing lock now",
lock.getLockOwner(), key, Thread.currentThread().getName());
releasePersistedLock(key, lockThread.getName());
lock.getLockThread(), key, Thread.currentThread().getName());
releasePersistedLock(key, lock.getLockThread());
localLocks.remove(key);
return;
}
......@@ -188,17 +190,53 @@ public abstract class AbstractLockManager implements InternalLockManager {
return retrieveLocks();
}
/**
* Destroy MOST not be synchronized because other threads need to be able to #unlock
*/
@Override
public synchronized void destroy() {
checkLive();
destroyed = true;
public void destroy() {
synchronized (this) {
if (destroyInProgress) {
return;
}
destroyInProgress = true;
}
scheduledExecutorService.shutdown();
try {
scheduledExecutorService.awaitTermination(longestIntervalSeconds + 5, SECONDS);
boolean success = scheduledExecutorService.awaitTermination(longestIntervalSeconds + 5, SECONDS);
if (!success) {
getLogger().warn("Not all jobs have been successfully completed.");
}
} catch (InterruptedException e) {
getLogger().error("InterruptedException during shutdown of scheduledExecutorService : ", e);
}
final List<Thread> waitForThreads = new ArrayList<>();
for (MutableLock mutableLock : localLocks.values()) {
Thread thread = mutableLock.getThread().get();
if (thread == null || !thread.isAlive() || thread.isInterrupted()) {
continue;
}
thread.interrupt();
waitForThreads.add(thread);
}
// try graceful shutdown for the interrupted threads
long waitMax = 10_000;
for (Thread thread : waitForThreads) {
try {
long start = System.currentTimeMillis();
thread.join(waitMax);
waitMax -= System.currentTimeMillis() - start;
} catch (InterruptedException e) {
getLogger().info("Thread '{}' already interrupted");
thread.interrupt();
}
}
clear();
destroyed = true;
}
@Override
......@@ -224,7 +262,7 @@ public abstract class AbstractLockManager implements InternalLockManager {
Map.Entry<String, MutableLock> next = iterator.next();
MutableLock lock = next.getValue();
if (lock.getThread().get() == null || !lock.getThread().get().isAlive()) {
getLogger().warn("Lock '{}' with lockOwner '{}' was present but the Thread that created the lock already stopped. " +
getLogger().error("Lock '{}' with lockOwner '{}' was present but the Thread that created the lock already stopped. " +
"Removing the lock now", next.getKey(), lock.getLockOwner());
releasePersistedLock(next.getKey(), next.getValue().getLockThread());
iterator.remove();
......@@ -238,26 +276,23 @@ public abstract class AbstractLockManager implements InternalLockManager {
}
}
@Override
public void addJob(final Runnable runnable, final long initialDelaySeconds, final long periodSeconds) {
// make the runnable synchronized to avoid concurrency from background jobs with methods in the LockManager
// modifying 'locks'
final Runnable synchronizedRunnable = () -> {
synchronized (AbstractLockManager.this) {
final long start = System.currentTimeMillis();
getLogger().info("Running '{}' at {}", runnable.getClass().getName(), Calendar.getInstance().getTime());
try {
runnable.run();
} catch (Exception e) {
getLogger().error("Background job '{}' resulted in exception.", runnable.getClass().getName(), e);
}
getLogger().info("Running '{}' finished in '{}' ms.", runnable.getClass().getName(), (System.currentTimeMillis() - start));
final Runnable exceptionCatchingRunnable = () -> {
final long start = System.currentTimeMillis();
getLogger().info("Running '{}' at {}", runnable.getClass().getName(), Calendar.getInstance().getTime());
try {
runnable.run();
} catch (Exception e) {
getLogger().error("Background job '{}' resulted in exception.", runnable.getClass().getName(), e);
}
getLogger().info("Running '{}' finished in '{}' ms.", runnable.getClass().getName(), (System.currentTimeMillis() - start));
};
if (periodSeconds > longestIntervalSeconds) {
longestIntervalSeconds = periodSeconds;
}
scheduledExecutorService.scheduleAtFixedRate(synchronizedRunnable, initialDelaySeconds, periodSeconds, SECONDS);
scheduledExecutorService.scheduleAtFixedRate(exceptionCatchingRunnable, initialDelaySeconds, periodSeconds, SECONDS);
}
/**
......
......@@ -28,9 +28,4 @@ public interface InternalLockManager extends LockManager {
*/
void clear();
/**
* Adds a job to be scheduled. This method is part of this {@link InternalLockManager} because in integration tests
* we can then set the initialDelaySeconds and periodSeconds very low (for testing concurrency)
*/
void addJob(Runnable runnable, long initialDelaySeconds, long periodSeconds);
}
......@@ -58,8 +58,6 @@ public class DbLockManager extends AbstractLockManager {
public static final String SELECT_STATEMENT = "SELECT * FROM " + TABLE_NAME_LOCK + " WHERE lockKey=?";
public static final String LOCK_STATEMENT = "UPDATE " + TABLE_NAME_LOCK + " SET status='RUNNING', lockTime=?, expirationTime=?, lockOwner=?, lockThread=? WHERE lockKey=? AND status='FREE'";
public static final String EXPIRED_BLOCKING_STATEMENT = "SELECT * FROM " + TABLE_NAME_LOCK + " WHERE expirationTime<? AND (status='RUNNING' OR status='ABORT') FOR UPDATE";
public static final String ALL_LOCKED_STATEMENT = "SELECT * FROM " + TABLE_NAME_LOCK + " WHERE status='RUNNING' OR status='ABORT'";
public static final String RESET_LOCK_STATEMENT = "UPDATE " + TABLE_NAME_LOCK + " SET " +
......@@ -72,21 +70,22 @@ public class DbLockManager extends AbstractLockManager {
"WHERE lockKey=? AND lockOwner=? AND lockThread=?";
public static final String RESET_LOCK_STATEMENT_BY_KEY_ONLY = "UPDATE " + TABLE_NAME_LOCK + " SET " +
public static final String RESET_STATEMENT = "UPDATE " + TABLE_NAME_LOCK + " SET " +
"lockOwner=NULL, " +
"lockThread=NULL, " +
"status='FREE', " +
"lockTime=0, " +
"refreshRateSeconds=0, " +
"expirationTime=0 " +
"WHERE lockKey=?";
"WHERE expirationTime<? AND (status='RUNNING' OR status='ABORT')";
public static final String ABORT_STATEMENT = "UPDATE " + TABLE_NAME_LOCK + " SET status='ABORT' WHERE lockKey=?";
public static final String ABORT_STATEMENT = "UPDATE " + TABLE_NAME_LOCK + " SET status='ABORT' WHERE lockKey=? AND status='RUNNING'";
// only refreshes its own cluster locks
public static final String LOCKS_TO_REFRESH_BLOCKING_STATEMENT = "SELECT * FROM " + TABLE_NAME_LOCK + " WHERE lockOwner=? AND expirationTime<? AND (status='RUNNING' OR status='ABORT') FOR UPDATE";
public static final String REFRESH_LOCK_STATEMENT = "UPDATE " + TABLE_NAME_LOCK + " SET expirationTime=? WHERE lockKey=?";
public static final String REFRESH_LOCK_STATEMENT = "UPDATE " + TABLE_NAME_LOCK + " SET expirationTime=expirationTime+60000 " +
"WHERE lockOwner=? AND expirationTime<? AND (status='RUNNING' OR status='ABORT')";
public static final String SELECT_ABORT_STATEMENT = "SELECT * FROM " + TABLE_NAME_LOCK + " WHERE status='ABORT' AND lockOwner=?";
private final DataSource dataSource;
private final String clusterNodeId;
......@@ -125,6 +124,7 @@ public class DbLockManager extends AbstractLockManager {
lockStatement.setString(4, threadName);
lockStatement.setString(5, key);
int changed = lockStatement.executeUpdate();
lockStatement.close();
if (changed == 0) {
log.debug("Either there is already a row entry for key '{}' which is not free OR the entry is not yet " +
......@@ -138,6 +138,7 @@ public class DbLockManager extends AbstractLockManager {
createStatement.setLong(6, expirationTime);
try {
createStatement.execute();
createStatement.close();
} catch (SQLException e) {
throw new LockException(String.format("Lock for '%s' is already taken", key), e);
}
......@@ -170,6 +171,7 @@ public class DbLockManager extends AbstractLockManager {
resetLockStatement.setString(2, clusterNodeId);
resetLockStatement.setString(3, threadName);
int changed = resetLockStatement.executeUpdate();
resetLockStatement.close();
if (changed == 0) {
final PreparedStatement selectStatement = connection.prepareStatement(SELECT_STATEMENT);
selectStatement.setString(1, key);
......@@ -177,10 +179,12 @@ public class DbLockManager extends AbstractLockManager {
if (!resultSet.next()) {
log.error("Database Lock '{}' cannot be released by '{}' and cluster '{}' because lock does not exist",
key, threadName, clusterNodeId);
selectStatement.close();
return;
} else {
log.error("Database Lock '{}' cannot be released for thread '{}' and cluster '{}' because lock is not owned.",
key, threadName, clusterNodeId);
selectStatement.close();
return;
}
}
......@@ -207,9 +211,12 @@ public class DbLockManager extends AbstractLockManager {
final PreparedStatement abortStatement = connection.prepareStatement(ABORT_STATEMENT);
abortStatement.setString(1, key);
int changed = abortStatement.executeUpdate();
abortStatement.close();
if (changed == 0) {
// can happen because by another Thread or cluster node already stopped in the meantime
log.info("Cannot set status to abort for '{}' because no such lock present.", key);
log.info("Cannot set status to abort for '{}' because no such lock present or already aborted or not runnning.",
key);
return;
}
log.info("Successfully changed status to abort for '{}'", key);
......@@ -230,9 +237,12 @@ public class DbLockManager extends AbstractLockManager {
ResultSet resultSet = selectStatement.executeQuery();
if (!resultSet.next()) {
log.debug("No database row found for lockKey '{}'", key);
selectStatement.close();
return false;
}
final String status = resultSet.getString("status");
selectStatement.close();
boolean locked = "RUNNING".equals(status) || "ABORT".equals(status);
log.debug("Found database row for '{}' with locked = {}", key, locked);
return locked;
......@@ -259,6 +269,8 @@ public class DbLockManager extends AbstractLockManager {
log.debug("Adding lock : {}", lock.toString());
locks.add(lock);
}
resultSet.close();
if (locks.size() == 0) {
log.debug("No locks found");
}
......
......@@ -17,7 +17,6 @@ package org.onehippo.repository.lock.db;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import javax.sql.DataSource;
......@@ -26,7 +25,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.onehippo.repository.lock.db.DbHelper.close;
import static org.onehippo.repository.lock.db.DbLockManager.LOCKS_TO_REFRESH_BLOCKING_STATEMENT;
import static org.onehippo.repository.lock.db.DbLockManager.REFRESH_LOCK_STATEMENT;
/**
......@@ -53,23 +51,14 @@ public class DbLockRefresher implements Runnable {
try {
connection = dataSource.getConnection();
originalAutoCommit = connection.getAutoCommit();
connection.setAutoCommit(false);
final PreparedStatement locksToRefreshStatement = connection.prepareStatement(LOCKS_TO_REFRESH_BLOCKING_STATEMENT);
locksToRefreshStatement.setString(1, clusterNodeId);
connection.setAutoCommit(true);
final PreparedStatement refreshStatement = connection.prepareStatement(REFRESH_LOCK_STATEMENT);
refreshStatement.setString(1, clusterNodeId);
// select all rows that have less than 20 seconds to live
locksToRefreshStatement.setLong(2, System.currentTimeMillis() + 20000);
ResultSet resultSet = locksToRefreshStatement.executeQuery();
while (resultSet.next()) {
// found lock to refresh
final String lockKey = resultSet.getString("lockKey");
final int refreshRateSeconds = resultSet.getInt("refreshRateSeconds");
log.info("Refreshing row with lockKey '{}'", lockKey);
final PreparedStatement unlockStatement = connection.prepareStatement(REFRESH_LOCK_STATEMENT);
unlockStatement.setLong(1, System.currentTimeMillis() + refreshRateSeconds * 1000);
unlockStatement.setString(2, lockKey);
unlockStatement.execute();
}
connection.commit();
refreshStatement.setLong(2, System.currentTimeMillis() + 20000);
int updated = refreshStatement.executeUpdate();
log.info("Refreshed {} locks", updated);
refreshStatement.close();
} catch (SQLException e) {
if (log.isDebugEnabled()) {
log.info("Exception in {} happened. Possibly another cluster node did already reset some lock rows:", this.getClass().getName(), e);
......
......@@ -17,15 +17,15 @@ package org.onehippo.repository.lock.db;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import javax.sql.DataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.onehippo.repository.lock.db.DbHelper.close;
import static org.onehippo.repository.lock.db.DbLockManager.EXPIRED_BLOCKING_STATEMENT;
import static org.onehippo.repository.lock.db.DbLockManager.RESET_LOCK_STATEMENT_BY_KEY_ONLY;
import static org.onehippo.repository.lock.db.DbLockManager.RESET_STATEMENT;
/**
* Resets expired locks to 'FREE' if they are in state 'RUNNING' or 'ABORT'
......@@ -47,22 +47,13 @@ public class DbResetExpiredLocksJanitor implements Runnable {
try {
connection = dataSource.getConnection();
originalAutoCommit = connection.getAutoCommit();
connection.setAutoCommit(false);
// since the 'expired blocking statement' can modify the same rows as another cluster node, we use
// autocommit 'false' and use a blocking statement
final PreparedStatement expiredBlockingStatement = connection.prepareStatement(EXPIRED_BLOCKING_STATEMENT);
expiredBlockingStatement.setLong(1, System.currentTimeMillis());
ResultSet resultSet = expiredBlockingStatement.executeQuery();
while (resultSet.next()) {
// found expired lock. Reset it
String lockKey = resultSet.getString("lockKey");
log.info("Resetting row with lockKey '{}' because expired", lockKey);
final PreparedStatement resetLockStatement = connection.prepareStatement(RESET_LOCK_STATEMENT_BY_KEY_ONLY);
resetLockStatement.setString(1, lockKey);
resetLockStatement.execute();
}
connection.commit();
} catch (Exception e) {
connection.setAutoCommit(true);
final PreparedStatement resetStatement = connection.prepareStatement(RESET_STATEMENT);
resetStatement.setLong(1, System.currentTimeMillis());
int updated = resetStatement.executeUpdate();
log.info("Expired {} locks", updated);
resetStatement.close();
} catch (SQLException e) {
if (log.isDebugEnabled()) {
log.info("Exception in {} happened. Possibly another cluster node did already reset some lock rows:", this.getClass().getName(), e);
} else {
......
......@@ -27,11 +27,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.onehippo.repository.lock.db.DbHelper.close;
import static org.onehippo.repository.lock.db.DbLockManager.SELECT_ABORT_STATEMENT;
import static org.onehippo.repository.lock.db.DbLockManager.TABLE_NAME_LOCK;
/**
* A Thread that contains a lock marked with 'ABORT' will be interrupted : In turn, that Thread should invoke #unlock itself
* If there is no Thread for the 'ABORT' marked lock, the lock will be reset to 'FREE'
* If there is no Thread for the 'ABORT' marked lock and the lock is for the current cluster node, the lock will be reset to 'FREE'
*/
public class LockThreadInterrupter implements Runnable {
......@@ -41,9 +42,6 @@ public class LockThreadInterrupter implements Runnable {
private final String clusterNodeId;
private final DbLockManager dbLockManager;
public static final String SELECT_ABORT_STATEMENT = "SELECT * FROM " + TABLE_NAME_LOCK + " WHERE status='ABORT' AND lockOwner=?";
public LockThreadInterrupter(final DataSource dataSource, final String clusterNodeId, final DbLockManager dbLockManager) {
this.dataSource = dataSource;
this.clusterNodeId = clusterNodeId;
......@@ -63,10 +61,10 @@ public class LockThreadInterrupter implements Runnable {
ResultSet resultSet = selectAbortStatement.executeQuery();
while (resultSet.next()) {
// interrupt the thread for this lock (if still present). Otherwise ignore.
String lockKey = resultSet.getString("lockKey");
final String lockKey = resultSet.getString("lockKey");
final String lockThread = resultSet.getString("lockThread");
if (lockThread == null) {
log.error("Cannot abort db entry '{}' for which lockThread is null.", lockKey);
log.error("Illegal database row state: cannot abort db entry '{}' for which lockThread is null.", lockKey);
continue;
}
boolean lockThreadForAbortFound = false;
......@@ -74,7 +72,12 @@ public class LockThreadInterrupter implements Runnable {
Thread thread = lock.getThread().get();
if (thread == null || !thread.isAlive()) {
// ignore since will be picked up by org.onehippo.services.lock.AbstractLockManager.UnlockStoppedThreadJanitor
} else if (lockThread.equals(thread.getName()) && lockKey.equals(lock.getLockKey())){
} else if (lockKey.equals(lock.getLockKey())){
if (!lockThread.equals(thread.getName())) {
log.error("Lock thread in JVM is other one than in database for lock '{}' which is an illegal state.",
lockKey);
continue;
}
// best effort : thread interrupt : As a result, the Thread containing the lock should invoke
// #unlock at some point in time
// There are no guarantees beyond best-effort attempts to stop
......@@ -102,6 +105,7 @@ public class LockThreadInterrupter implements Runnable {
"lock has been expired it will be removed by DbLockResetJanitor", lockKey);
}
}
selectAbortStatement.close();
} catch (Exception e) {
log.error("Exception in {} happened:", this.getClass().getName(), e);
} finally {
......
......@@ -48,7 +48,6 @@ public abstract class AbstractLockManagerTest extends RepositoryTestCase {
protected DataSource dataSource;
@Override
@Before
public void setUp() throws Exception {
super.setUp();
lockManager = (InternalLockManager)HippoServiceRegistry.getService(LockManager.class);
......
......@@ -94,7 +94,7 @@ public class LockManagerAbortTest extends AbstractLockManagerTest {
}
@Test
public void a_lock_set_to_abort_is_still_freed_on_lockmanager_clear_but_not_being_interrupted_any_more() throws Exception {
public void a_lock_set_to_abort_is_still_set_to_free_on_lockmanager_clear_but_not_being_interrupted_any_more() throws Exception {
if (dataSource == null) {
// in memory test
return;
......
package org.onehippo.repository.lock;
import javax.jcr.Session;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.onehippo.cms7.services.lock.LockException;
import org.onehippo.repository.lock.db.DbLockManager;
import org.onehippo.repository.lock.memory.MemoryLockManager;
import org.onehippo.testutils.log4j.Log4jInterceptor;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class LockManagerDestroyTest extends AbstractLockManagerTest {
@Override
@After
public void tearDown() throws Exception {
// since LockManager#destroy removes destroys the lock manager, we need to recreate the repository per test
super.tearDown(true);
}
@Test(expected = IllegalStateException.class)
public void after_destroy_the_lock_manager_cannot_lock_any_more() throws Exception {
lockManager.destroy();
lockManager.lock("123");
}
@Test
public void destroy_lock_manager_interrupts_running_lock_threads() throws Exception {
final String key = "123";
final LockRunnable runnable = new LockRunnable(key, true);
final Thread lockThread = new Thread(runnable);
lockThread.start();
// give lockThread time to lock
Thread.sleep(100);
dbRowAssertion(key, "RUNNING");
try (Log4jInterceptor interceptor = Log4jInterceptor.onWarn().trap(MemoryLockManager.class, DbLockManager.class).build()) {
lockManager.destroy();
assertEquals(0, interceptor.messages().count());
}
assertTrue("Thread containing lock 123 should had been interrupted ", runnable.interrupted);
assertFalse(lockThread.isAlive());
// assert db record lock is 'free'
dbRowAssertion(key, "FREE");
}
@Test
public void if_running_lock_thread_does_not_stop_on_interrupt_it_takes_10_seconds_to_destroy_and_a_warning_is_logged() throws Exception {
final String key = "123";
final LockUnstoppableRunnable unstoppable = new LockUnstoppableRunnable(key, true);
final Thread unstoppableLockThread = new Thread(unstoppable);
unstoppableLockThread.start();
// give lockThread time to lock
Thread.sleep(100);
dbRowAssertion(key, "RUNNING");
try (Log4jInterceptor interceptor = Log4jInterceptor.onWarn().trap(MemoryLockManager.class, DbLockManager.class).build()) {
final long start = System.currentTimeMillis();
lockManager.destroy();
assertTrue("Lock Manager destroy was expected to take at least 10 seconds to wait for all interrupted threads " +
"holding a lock to #unlock", (System.currentTimeMillis() - start ) >= 10_000);
assertTrue(interceptor.messages().anyMatch(m -> m.contains("Lock '123' owned by '"+getClusterNodeId(session)+"' was never unlocked. Removing the lock now.")));
}
assertTrue("Thread containing lock 123 should had been interrupted ", unstoppable.interrupted);
// the thread never died and never did unlock
assertTrue(unstoppableLockThread.isAlive());
// however the thread never unlocked the lock, the LockManager did so in #clear
dbRowAssertion(key, "FREE");
// stop the thread
unstoppable.keepAlive = false;
unstoppableLockThread.join();
}
private String getClusterNodeId(final Session session) {
String clusterNodeId = session.getRepository().getDescriptor("jackrabbit.cluster.id");
if (clusterNodeId == null) {
clusterNodeId = "default";
}
return clusterNodeId;
}
protected class LockRunnable implements Runnable {
private String key;
private volatile boolean keepAlive;
private boolean interrupted;
LockRunnable(final String key, final boolean keepAlive) {
this.key = key;
this.keepAlive = keepAlive;
}
@Override
public void run() {
try {
lockManager.lock(key);
while (keepAlive) {
Thread.sleep(25);
}
} catch (LockException e) {
} catch (InterruptedException e) {
lockManager.unlock(key);
interrupted = true;
}
}
}
protected class LockUnstoppableRunnable implements Runnable {
private String key;
private volatile boolean keepAlive;
private boolean interrupted;
LockUnstoppableRunnable(final String key, final boolean keepAlive) {
this.key = key;
this.keepAlive = keepAlive;
}
@Override
public void run() {
// NOTE on purpose wrong construct since never #unlock is called
while (keepAlive) {
try {
lockManager.lock(key);
Thread.sleep(25);
} catch (LockException | InterruptedException e) {
interrupted = true;
} catch (IllegalStateException e){
// happens after the lock manager has been destroyed
}
}
}
}
}
......@@ -34,7 +34,7 @@ public class LockManagerRefreshTest extends AbstractLockManagerTest {
@Test
public void locks_gets_refreshed_while_still_in_use() throws Exception {
public void locks_get_refreshed_while_still_in_use() throws Exception {
if (dataSource == null) {
// db test only
return;
......@@ -57,7 +57,7 @@ public class LockManagerRefreshTest extends AbstractLockManagerTest {
long start = System.currentTimeMillis();
while (expires == getExpireTime(key)) {
if ((System.currentTimeMillis() - start) > 10000) {
fail("Within 5 seconds DbLockRefresher should had bumped the expires time but it didn't do it after 6 seconds");
fail("Within 5 seconds DbLockRefresher should had bumped the expires time but it didn't do it after 10 seconds");
}
Thread.sleep(100);
}
......
......@@ -16,4 +16,3 @@
Updating /test/foo/bar failed
Failed to acquire next trigger
Removing the lock now
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment