code.onehippo.org is currently readonly. We are migrating to code.bloomreach.com, please continue working there on Monday 14/12. See: https://docs.bloomreach.com/display/engineering/GitLab

Commit 8e614323 authored by Ate Douma's avatar Ate Douma

REPO-1873 Support closing a LockResource by a different thread - implementation

parent 731d3bb3
......@@ -101,22 +101,38 @@ public abstract class AbstractLockManager implements InternalLockManager {
}
private class LockResourceImpl implements LockResource {
private MutableLock lock;
private final MutableLock lock;
private final boolean newLock;
private boolean closed = false;
LockResourceImpl(final MutableLock lock) {
this.lock = lock;
this.newLock = lock.getHoldCount() == 1;
}
@Override
public boolean isClosed() {
return closed;
}
@Override
public void close() {
unlock(lock.getLockKey());
if (!closed) {
closed = true;
unlock(this);
}
}
@Override
public Lock getLock() {
public MutableLock getLock() {
return lock;
}
@Override
public boolean isNewLock() {
return newLock;
}
@Override
public Thread getHolder() {
return lock.getThread().get();
......@@ -127,42 +143,59 @@ public abstract class AbstractLockManager implements InternalLockManager {
public synchronized void unlock(final String key) {
checkLive();
validateKey(key);
final MutableLock lock = localLocks.get(key);
if (lock == null) {
getLogger().error("Lock '{}' does not exist or this cluster node does not contain the lock hence a thread from " +
"this JVM cannot unlock it", key);
unlock(key, null);
}
protected synchronized void unlock(final LockResource lockResource) {
checkLive();
unlock(lockResource.getLock().getLockKey(), lockResource.getLock());
}
protected void unlock(final String key, final Lock lock) {
final MutableLock localLock = localLocks.get(key);
if (localLock == null) {
if (lock == null) {
getLogger().error("Lock '{}' does not exist or this cluster node does not contain the lock hence a thread from " +
"this JVM cannot unlock it", key);
} else {
final String msg = String.format("Lock '%s' already manually unlocked in thread '%s'. This is a coding error!", key, lock.getLockThread());
getLogger().error(msg, new IllegalStateException(msg));
}
return;
} else if (lock != null && lock != localLock) {
final String msg = String.format("Lock '%s' already unlocked before AND locked again. This is a coding error!", key);
getLogger().error(msg, new IllegalStateException(msg));
return;
}
final Thread lockThread = lock.getThread().get();
final Thread lockThread = localLock.getThread().get();
if (lockThread == null || !lockThread.isAlive()) {
getLogger().error("Thread '{}' that created lock for '{}' has stopped without releasing the lock. The Thread " +
"should had invoked #unlock. Removing lock now",
lock.getLockThread(), key, Thread.currentThread().getName());
releasePersistedLock(key, lock.getLockThread());
localLock.getLockThread(), key, Thread.currentThread().getName());
releasePersistedLock(key, localLock.getLockThread());
localLocks.remove(key);
return;
}
if (lockThread != Thread.currentThread()) {
if (lock == null && lockThread != Thread.currentThread()) {
getLogger().error("Thread '{}' cannot unlock '{}' because lock owned by '{}'. Thread '{}' should never had " +
"invoked #unlock({}), this is an end implementation error.", Thread.currentThread().getName(), key,
"invoked #unlock({}), This is a coding error!", Thread.currentThread().getName(), key,
lockThread.getName(), Thread.currentThread().getName(), key);
return;
}
lock.decrement();
if (lock.getHoldCount() < 0) {
localLock.decrement();
if (localLock.getHoldCount() < 0) {
getLogger().error("Hold count of lock should never be able to be less than 0. Core implementation issue in {}. Remove " +
"lock for {} nonetheless.",
this.getClass().getName(), key);
localLocks.remove(key);
releasePersistedLock(key, lockThread.getName());
} else if (lock.getHoldCount() == 0) {
} else if (localLock.getHoldCount() == 0) {
getLogger().debug("Remove lock '{}'", key);
localLocks.remove(key);
releasePersistedLock(key, lockThread.getName());
} else {
getLogger().debug("Lock '{}' will not be removed since hold count is '{}'", key, lock.getHoldCount());
getLogger().debug("Lock '{}' will not be removed since hold count is '{}'", key, localLock.getHoldCount());
}
}
@Override
......@@ -198,7 +231,18 @@ public abstract class AbstractLockManager implements InternalLockManager {
public synchronized boolean isLocked(final String key) throws LockManagerException {
checkLive();
validateKey(key);
expungeNeverUnlockedLocksFromStoppedThreads();
final MutableLock lock = localLocks.get(key);
if (lock != null) {
final Thread thread = lock.getThread().get();
if (thread == null || !thread.isAlive()) {
getLogger().error("Lock '{}' with lockOwner '{}' was present but the Thread that created the lock already stopped. " +
"Removing the lock now", key, lock.getLockOwner());
releasePersistedLock(key, lock.getLockThread());
localLocks.remove(key);
return false;
}
return true;
}
return containsLock(key);
}
......@@ -279,9 +323,10 @@ public abstract class AbstractLockManager implements InternalLockManager {
public synchronized void expungeNeverUnlockedLocksFromStoppedThreads() {
final Iterator<Map.Entry<String, MutableLock>> iterator = localLocks.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, MutableLock> next = iterator.next();
MutableLock lock = next.getValue();
if (lock.getThread().get() == null || !lock.getThread().get().isAlive()) {
final Map.Entry<String, MutableLock> next = iterator.next();
final MutableLock lock = next.getValue();
final Thread thread = lock.getThread().get();
if (thread == null || !thread.isAlive()) {
getLogger().error("Lock '{}' with lockOwner '{}' was present but the Thread that created the lock already stopped. " +
"Removing the lock now", next.getKey(), lock.getLockOwner());
releasePersistedLock(next.getKey(), next.getValue().getLockThread());
......@@ -291,8 +336,8 @@ public abstract class AbstractLockManager implements InternalLockManager {
}
private void validateKey(final String key) {
if (key == null || key.length() > 256) {
throw new IllegalArgumentException("Key is not allowed to be null or longer than 256 chars");
if (key == null || key.length() > LOCK_KEY_MAX_LENGTH) {
throw new IllegalArgumentException(String.format("Key is not allowed to be null or longer than %s chars", LOCK_KEY_MAX_LENGTH));
}
}
......
......@@ -21,7 +21,7 @@ import org.onehippo.cms7.services.lock.Lock;
public class MutableLock extends Lock {
private WeakReference<Thread> thread;
private final WeakReference<Thread> thread;
private int holdCount;
......
......@@ -20,6 +20,7 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import org.junit.Test;
......@@ -145,7 +146,7 @@ public class LockManagerBasicTest extends AbstractLockManagerTest {
}
@Test
public void same_thread_can_unlock_() throws Exception {
public void same_thread_can_unlock() throws Exception {
final String key = "123";
lockManager.lock(key);
dbRowAssertion(key, "RUNNING");
......@@ -155,6 +156,21 @@ public class LockManagerBasicTest extends AbstractLockManagerTest {
assertEquals(0, lockManager.getLocks().size());
}
@Test
public void manually_unlock_when_using_lock_resource() throws Exception {
final String key = "123";
final LockResource lockResource = lockManager.lock(key);
dbRowAssertion(key, "RUNNING");
lockManager.unlock(key);
dbRowAssertion(key, "FREE");
assertDbRowDoesExist(key);
assertEquals(0, lockManager.getLocks().size());
try (Log4jInterceptor interceptor = Log4jInterceptor.onError().trap(MemoryLockManager.class, DbLockManager.class).build()) {
lockResource.close();
assertTrue(interceptor.messages().anyMatch(m -> m.contains("Lock '"+key+"' already manually unlocked in thread '"+Thread.currentThread().getName()+"'. This is a coding error!")));
}
}
@Test
public void unlock_non_existing_lock() throws Exception {
try (Log4jInterceptor interceptor = Log4jInterceptor.onWarn().trap(MemoryLockManager.class, DbLockManager.class).build()) {
......@@ -164,7 +180,7 @@ public class LockManagerBasicTest extends AbstractLockManagerTest {
}
@Test
public void other_thread_cannot_unlock_() throws Exception {
public void other_thread_cannot_unlock() throws Exception {
final String key = "123";
lockManager.lock(key);
dbRowAssertion(key, "RUNNING");
......@@ -201,6 +217,78 @@ public class LockManagerBasicTest extends AbstractLockManagerTest {
dbRowAssertion(key, "FREE");
}
@Test
public void other_thread_can_unlock_using_lock_resource() throws Exception {
final String key = "123";
final LockResource lockResource = lockManager.lock(key);
dbRowAssertion(key, "RUNNING");
Thread lockThread = new Thread(() -> {
try {
assertTrue(lockManager.isLocked(key));
} catch (LockManagerException e) {
fail("#isLocked should not fail : " + e.toString());
}
try {
dbRowAssertion(key, "RUNNING");
} catch (SQLException e1) {
fail(e1.toString());
}
lockResource.close();
try {
assertFalse(lockManager.isLocked(key));
} catch (LockManagerException e) {
fail("#isLocked should not fail : " + e.toString());
}
try {
dbRowAssertion(key, "FREE");
} catch (SQLException e1) {
fail(e1.toString());
}
});
lockThread.start();
lockThread.join();
assertEquals(0, lockManager.getLocks().size());
}
@Test
public void cannot_close_lock_resource_after_already_unlocked_before_AND_locked_again() throws Exception {
final String key = "123";
final LockResource lockResource = lockManager.lock(key);
dbRowAssertion(key, "RUNNING");
final CountDownLatch lockLatch = new CountDownLatch(1);
final CountDownLatch testLatch = new CountDownLatch(1);
final CountDownLatch unlockLatch = new CountDownLatch(1);
// bad pattern (to be tested): manually unlock while also using a lockResource
lockManager.unlock(key);
Thread lockThread = new Thread(() -> {
try {
lockLatch.await();
try (LockResource ignore = lockManager.lock(key)) {
// surprise: another thread stole the lock
testLatch.countDown();
unlockLatch.await();
}
} catch (Exception ignore) {
}
});
lockThread.start();
// let other thread steal the lock
lockLatch.countDown();
// wait for the lock to be stolen
testLatch.await();
try (Log4jInterceptor interceptor = Log4jInterceptor.onError().trap(MemoryLockManager.class, DbLockManager.class).build()) {
// no try to close the lockResource
lockResource.close();
// surprise: lock stolen (error message only)
assertTrue(interceptor.messages().anyMatch(m -> m.contains("Lock '"+key+"' already unlocked before AND locked again. This is a coding error!")));
}
// now also have the stolen lock closed
unlockLatch.countDown();
lockThread.join();
assertEquals(0, lockManager.getLocks().size());
}
@Test
public void when_other_thread_contains_lock_a_lock_exception_is_thrown_on_lock_attempt() throws Exception {
final String key = "123";
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment