code.onehippo.org is currently readonly. We are migrating to code.bloomreach.com, please continue working there on Monday 14/12. See: https://docs.bloomreach.com/display/engineering/GitLab

Commit 441beff4 authored by Ard Schrijvers's avatar Ard Schrijvers

REPO-1811 Make sure getLocks and isLocked works cluster wide in case of a database

parent cbf14aed
......@@ -15,7 +15,6 @@
*/
package org.onehippo.repository.lock;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.HashMap;
import java.util.Iterator;
......@@ -48,11 +47,15 @@ public abstract class AbstractLockManager implements InternalLockManager {
protected abstract Logger getLogger();
protected abstract MutableLock createLock(final String key, final String threadName, final int refreshRateSeconds) throws LockException;
protected abstract MutableLock createLock(String key, String threadName, int refreshRateSeconds) throws LockException;
protected abstract void releasePersistedLock(final String key, final String threadName) throws LockException;
protected abstract void releasePersistedLock(String key, String threadName) throws LockException;
protected abstract void abortPersistedLock(final String key) throws LockException;
protected abstract void abortPersistedLock(String key) throws LockException;
protected abstract boolean containsLock(String key) throws LockException;
protected abstract List<Lock> retrieveLocks() throws LockException;
public AbstractLockManager() {
scheduledExecutorService = Executors.newScheduledThreadPool(1);
......@@ -169,16 +172,14 @@ public abstract class AbstractLockManager implements InternalLockManager {
checkLive();
validateKey(key);
expungeNeverUnlockedLocksFromStoppedThreads();
// TODO in case of database, check agains database locks
return localLocks.containsKey(key);
return containsLock(key);
}
@Override
public synchronized List<Lock> getLocks() {
public synchronized List<Lock> getLocks() throws LockException {
checkLive();
expungeNeverUnlockedLocksFromStoppedThreads();
// TODO in case of database, get database locks
return new ArrayList<>(localLocks.values());
return retrieveLocks();
}
@Override
......
......@@ -25,8 +25,9 @@ public class MutableLock extends Lock {
int holdCount;
public MutableLock(final String lockKey, final String lockOwner, final String lockThread, final long creationTime) {
super(lockKey, lockOwner, lockThread, creationTime);
public MutableLock(final String lockKey, final String lockOwner, final String lockThread,
final long creationTime, final String status) {
super(lockKey, lockOwner, lockThread, creationTime, status);
thread = new WeakReference<>(Thread.currentThread());
holdCount = 1;
}
......
......@@ -75,6 +75,16 @@ public class DbHelper {
return resultSet.next();
}
public static void close(final Connection connection) {
if (connection == null) {
return;
}
try {
connection.close();
} catch (SQLException e) {
log.error("Failed to close connection.", e);
}
}
public static void close(final Connection connection, final boolean originalAutoCommit) {
if (connection == null) {
......
......@@ -19,9 +19,13 @@ import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import javax.sql.DataSource;
import org.onehippo.cms7.services.lock.Lock;
import org.onehippo.cms7.services.lock.LockException;
import org.onehippo.repository.lock.AbstractLockManager;
import org.onehippo.repository.lock.MutableLock;
......@@ -54,6 +58,8 @@ public class DbLockManager extends AbstractLockManager {
public static final String EXPIRED_BLOCKING_STATEMENT = "SELECT * FROM " + TABLE_NAME_LOCK + " WHERE expirationTime<? AND (status='RUNNING' OR status='ABORT') FOR UPDATE";
public static final String ALL_LOCKED_STATEMENT = "SELECT * FROM " + TABLE_NAME_LOCK + " WHERE status='RUNNING' OR status='ABORT'";
public static final String RESET_LOCK_STATEMENT = "UPDATE " + TABLE_NAME_LOCK + " SET " +
"lockOwner=NULL, " +
"lockThread=NULL, " +
......@@ -142,7 +148,7 @@ public class DbLockManager extends AbstractLockManager {
}
}
log.debug("Obtained a lock for '{}'", key);
return new MutableLock(key, clusterNodeId, threadName, lockTime);
return new MutableLock(key, clusterNodeId, threadName, lockTime, "RUNNING");
} catch (SQLException e) {
if (log.isDebugEnabled()) {
......@@ -210,6 +216,7 @@ public class DbLockManager extends AbstractLockManager {
abortStatement.setString(1, key);
int changed = abortStatement.executeUpdate();
if (changed == 0) {
// can happen because by another Thread or cluster node already stopped in the meantime
log.info("Cannot set status to abort for '{}' because no such lock present.", key);
return;
}
......@@ -228,4 +235,61 @@ public class DbLockManager extends AbstractLockManager {
}
}
@Override
protected boolean containsLock(final String key) throws LockException {
try (Connection connection = dataSource.getConnection()) {
final PreparedStatement selectStatement = connection.prepareStatement(SELECT_STATEMENT);
selectStatement.setString(1, key);
ResultSet resultSet = selectStatement.executeQuery();
if (!resultSet.next()) {
log.debug("No database row found for lockKey '{}'", key);
return false;
}
final String status = resultSet.getString("status");
boolean locked = "RUNNING".equals(status) || "ABORT".equals(status);
log.debug("Found database row for '{}' with locked = {}", key, locked);
return locked;
} catch (SQLException e) {
String msg = String.format("Could not query for '%s'.", key);
if (log.isDebugEnabled()) {
log.info(msg, e);
} else {
log.info(msg + " : {}", e.toString());
}
throw new LockException(msg, e);
}
}
@Override
protected List<Lock> retrieveLocks() throws LockException {
try (Connection connection = dataSource.getConnection()) {
final PreparedStatement selectStatement = connection.prepareStatement(ALL_LOCKED_STATEMENT);
ResultSet resultSet = selectStatement.executeQuery();
final List<Lock> locks = new ArrayList<>();
while (resultSet.next()) {
final String lockKey = resultSet.getString("lockKey");
final String lockOwner = resultSet.getString("lockOwner");
final String lockThread = resultSet.getString("lockThread");
final long lockTime = resultSet.getLong("lockTime");
final String status = resultSet.getString("status");
final Lock lock = new Lock(lockKey, lockOwner, lockThread, lockTime, status);
log.debug("Adding lock : {}", lock.toString());
locks.add(lock);
}
if (locks.size() == 0) {
log.debug("No locks found");
}
return locks;
} catch (SQLException e) {
String msg = String.format("Could retrieve locks");
if (log.isDebugEnabled()) {
log.info(msg, e);
} else {
log.info(msg + " : {}", e.toString());
}
throw new LockException(msg, e);
}
}
}
......@@ -15,6 +15,10 @@
*/
package org.onehippo.repository.lock.memory;
import java.util.ArrayList;
import java.util.List;
import org.onehippo.cms7.services.lock.Lock;
import org.onehippo.cms7.services.lock.LockException;
import org.onehippo.repository.lock.AbstractLockManager;
import org.onehippo.repository.lock.MutableLock;
......@@ -36,7 +40,7 @@ public class MemoryLockManager extends AbstractLockManager {
@Override
protected MutableLock createLock(final String key, final String threadName, final int refreshRateSeconds) throws LockException {
return new MutableLock(key, "default", threadName, System.currentTimeMillis());
return new MutableLock(key, "default", threadName, System.currentTimeMillis(), "RUNNING");
}
@Override
......@@ -49,4 +53,13 @@ public class MemoryLockManager extends AbstractLockManager {
// no persistent lock needs to be aborted so nothing else is needed
}
@Override
protected synchronized boolean containsLock(final String key) throws LockException {
return localLocks.containsKey(key);
}
@Override
protected synchronized List<Lock> retrieveLocks() throws LockException {
return new ArrayList<>(localLocks.values());
}
}
......@@ -19,6 +19,7 @@ import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import java.util.concurrent.ExecutionException;
import javax.jcr.Repository;
......@@ -31,6 +32,7 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.onehippo.cms7.services.HippoServiceRegistry;
import org.onehippo.cms7.services.lock.Lock;
import org.onehippo.cms7.services.lock.LockException;
import org.onehippo.cms7.services.lock.LockManager;
import org.onehippo.repository.journal.JournalConnectionHelperAccessor;
......@@ -116,7 +118,8 @@ public class LockManagerTest extends RepositoryTestCase {
final String key = "123";
lockManager.lock(key);
dbRowAssertion(key, "RUNNING");
// for 'node1', see h2-repository.xml or mysql-repository.xml the <cluster> node id
dbRowAssertion(key, "RUNNING", "node1", Thread.currentThread().getName());
lockManager.lock(key);
......@@ -126,12 +129,9 @@ public class LockManagerTest extends RepositoryTestCase {
assertEquals(key, lockManager.getLocks().iterator().next().getLockKey());
assertEquals(Thread.currentThread().getName(), lockManager.getLocks().iterator().next().getLockThread());
assertEquals(2, ((MutableLock)lockManager.getLocks().iterator().next()).getHoldCount());
lockManager.unlock(key);
assertEquals(1, lockManager.getLocks().size());
assertEquals(1, ((MutableLock)lockManager.getLocks().iterator().next()).getHoldCount());
dbRowAssertion(key, "RUNNING");
......@@ -145,8 +145,12 @@ public class LockManagerTest extends RepositoryTestCase {
lockManager.lock(key);
dbRowAssertion(key, "RUNNING");
}
private void dbRowAssertion(final String key, final String expectedStatus) throws SQLException {
dbRowAssertion(key, expectedStatus, null, null);
}
private void dbRowAssertion(final String key, final String expectedStatus, final String lockOwnerExpectation, final String lockThreadExpectation) throws SQLException {
if (dataSource == null) {
// not a clustered db test
return;
......@@ -157,8 +161,13 @@ public class LockManagerTest extends RepositoryTestCase {
selectStatement.setString(1, key);
ResultSet resultSet = selectStatement.executeQuery();
if (resultSet.next()) {
String status = resultSet.getString("status");
assertEquals(expectedStatus, status);
assertEquals(expectedStatus, resultSet.getString("status"));
if (lockOwnerExpectation != null) {
assertEquals(lockOwnerExpectation, resultSet.getString("lockOwner"));
}
if (lockThreadExpectation != null) {
assertEquals(lockThreadExpectation, resultSet.getString("lockThread"));
}
} else {
fail(String.format("A row with lockKey '%s' should exist", key));
}
......@@ -275,6 +284,8 @@ public class LockManagerTest extends RepositoryTestCase {
// give lockThread time to lock
Thread.sleep(100);
dbRowAssertion(key, "RUNNING", "node1", lockThread.getName());
try {
lockManager.unlock(key);
fail("Main thread should not be able to unlock");
......@@ -332,7 +343,7 @@ public class LockManagerTest extends RepositoryTestCase {
addManualLockToDatabase("c", "otherNode", "otherThreadName", 60);
addManualLockToDatabase("d", "otherNode", "otherThreadName", 60);
dbRowAssertion("a", "RUNNING");
dbRowAssertion("a", "RUNNING", "node1", Thread.currentThread().getName());
dbRowAssertion("b", "RUNNING");
dbRowAssertion("c", "RUNNING");
dbRowAssertion("d", "RUNNING");
......@@ -393,4 +404,42 @@ public class LockManagerTest extends RepositoryTestCase {
}
}
}
@Test
public void get_locks_returns_also_locks_owned_by_other_cluster_node_in_case_of_clustered_setup() throws Exception {
lockManager.lock("a");
lockManager.lock("b");
// insert manually non-owned rows
addManualLockToDatabase("c", "otherNode", "otherThreadName", 60);
addManualLockToDatabase("d", "otherNode", "otherThreadName", 60);
List<Lock> locks = lockManager.getLocks();
if (dataSource == null) {
// in memory manager
assertEquals(2, locks.size());
} else {
dbRowAssertion("a", "RUNNING", "node1", Thread.currentThread().getName());
dbRowAssertion("c", "RUNNING", "otherNode", "otherThreadName");
assertEquals(4, locks.size());
}
}
@Test
public void is_lock_also_checks_locks_owned_by_other_cluster_nodes_in_case_of_clustered_setup() throws Exception {
lockManager.lock("a");
lockManager.lock("b");
// insert manually non-owned rows
assertTrue(lockManager.isLocked("a"));
assertTrue(lockManager.isLocked("b"));
if (dataSource != null) {
addManualLockToDatabase("c", "otherNode", "otherThreadName", 60);
addManualLockToDatabase("d", "otherNode", "otherThreadName", 60);
assertTrue(lockManager.isLocked("c"));
assertTrue(lockManager.isLocked("d"));
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment