code.onehippo.org is currently readonly. We are migrating to code.bloomreach.com, please continue working there on Monday 14/12. See: https://docs.bloomreach.com/display/engineering/GitLab

Commit e0f19fab authored by Ard Schrijvers's avatar Ard Schrijvers

REPO-1811 Support for a Database lock manager

The Database lock manager interaction is pretty much exactly the same
as the MemoryLockManager interaction. The biggest difference is that
the DbLockManager creates database based locks and the MemoryLockManager
creates MemoryLocks. Note that a DbLock is the same as the MemoryLock
but only contains a #destroy implementation that releases the database
lock.

The DbLockManager is still work in progress
parent 4750450f
/*
* Copyright 2017 Hippo B.V. (http://www.onehippo.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onehippo.services.lock;
import java.lang.ref.WeakReference;
import org.onehippo.cms7.services.lock.Lock;
abstract class AbstractLock extends Lock {
private WeakReference<Thread> thread;
int holdCount;
public AbstractLock(final String lockKey, final String name, final long creationTime) {
super(lockKey, name, creationTime);
thread = new WeakReference<>(Thread.currentThread());
holdCount = 1;
}
public WeakReference<Thread> getThread() {
return thread;
}
public void setThread(final WeakReference<Thread> thread) {
this.thread = thread;
}
public void increment() {
holdCount++;
}
public void decrement() {
holdCount--;
}
abstract void destroy();
}
/*
* Copyright 2017 Hippo B.V. (http://www.onehippo.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onehippo.services.lock;
import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.onehippo.cms7.services.lock.Lock;
import org.onehippo.cms7.services.lock.LockException;
import org.onehippo.cms7.services.lock.LockManager;
import org.slf4j.Logger;
public abstract class AbstractLockManager implements LockManager {
private final Map<String, AbstractLock> locks = new HashMap();
abstract Logger getLogger();
abstract AbstractLock createLock(final String key) throws LockException;
@Override
public synchronized void lock(final String key) throws LockException {
final AbstractLock abstractLock = locks.get(key);
if (abstractLock == null) {
getLogger().debug("Create lock '{}' for thread '{}'", key, Thread.currentThread().getName());
locks.put(key, createLock(key));
return;
}
final Thread lockThread = abstractLock.getThread().get();
if (lockThread == null) {
getLogger().warn("Thread '{}' that created lock for '{}' has stopped without releasing the lock. Thread '{}' " +
"now gets the lock", abstractLock.getLockOwner(), key, Thread.currentThread().getName());
abstractLock.setThread(new WeakReference<>(Thread.currentThread()));
return;
}
if (lockThread == Thread.currentThread()) {
getLogger().debug("Thread '{}' already contains lock '{}', increase hold count", Thread.currentThread().getName(), key);
abstractLock.increment();
return;
}
throw new LockException(String.format("This thread '%s' cannot lock '%s' : already locked by thread '%s'",
Thread.currentThread().getName(), key, lockThread.getName()));
}
@Override
public synchronized void unlock(final String key) throws LockException {
final AbstractLock abstractLock = locks.get(key);
if (abstractLock == null) {
getLogger().debug("No lock present for '{}'", key);
return;
}
final Thread lockThread = abstractLock.getThread().get();
if (lockThread == null) {
getLogger().warn("Thread '{}' that created lock for '{}' has stopped without releasing the lock. Removing lock now",
abstractLock.getLockOwner(), key, Thread.currentThread().getName());
locks.remove(key);
}
if (lockThread != Thread.currentThread()) {
throw new LockException(String.format("Thread '%s' cannot unlock '%s' because lock owned by '%s'", Thread.currentThread().getName(), key,
lockThread.getName()));
}
abstractLock.decrement();
if (abstractLock.holdCount < 0) {
getLogger().error("Hold count of lock should never be able to be less than 0. Core implementation issue in {}. Remove " +
"lock for {} nonetheless.",
this.getClass().getName(), key);
locks.remove(key);
} else if (abstractLock.holdCount == 0) {
getLogger().debug("Remove lock '{}'", key);
locks.remove(key);
} else {
getLogger().debug("Lock '{}' will not be removed since hold count is '{}'", key, abstractLock.holdCount);
}
}
@Override
public synchronized boolean isLocked(final String key) throws LockException {
expungeNeverUnlockedLocksFromGCedThreads();
return locks.containsKey(key);
}
@Override
public synchronized List<Lock> getLocks() {
expungeNeverUnlockedLocksFromGCedThreads();
return new ArrayList<>(locks.values());
}
@Override
public void destroy() {
Iterator<Map.Entry<String, AbstractLock>> iterator = locks.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, AbstractLock> next = iterator.next();
getLogger().warn("Lock '{}' owned by '{}' was never unlocked. Removing the lock now.", next.getKey(), next.getValue().getLockOwner());
next.getValue().destroy();
iterator.remove();
}
}
private void expungeNeverUnlockedLocksFromGCedThreads() {
Iterator<Map.Entry<String, AbstractLock>> iterator = locks.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, AbstractLock> next = iterator.next();
if (next.getValue().getThread().get() == null) {
getLogger().warn("Lock '{}' with lockOwner '{}' was present but the Thread that created the lock does not exist any more. " +
"Removing the lock now", next.getKey(), next.getValue().getLockOwner());
next.getValue().destroy();
iterator.remove();
}
}
}
}
/*
* Copyright 2017 Hippo B.V. (http://www.onehippo.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onehippo.services.lock;
import java.util.List;
import org.onehippo.cms7.services.lock.Lock;
import org.onehippo.cms7.services.lock.LockException;
import org.onehippo.cms7.services.lock.LockManager;
public class AssertingLockManager implements LockManager {
private LockManager delegatee;
public AssertingLockManager(final LockManager delegatee) {
this.delegatee = delegatee;
}
@Override
public void lock(final String key) throws LockException {
if (key == null || key.length() > 256) {
throw new IllegalArgumentException("Key is not allowed to be null or longer than 256 chars");
}
delegatee.lock(key);
}
@Override
public void unlock(final String key) throws LockException {
if (key == null || key.length() > 256) {
throw new IllegalArgumentException("Key is not allowed to be null or longer than 256 chars");
}
delegatee.unlock(key);
}
@Override
public boolean isLocked(final String key) throws LockException {
if (key == null || key.length() > 256) {
throw new IllegalArgumentException("Key is not allowed to be null or longer than 256 chars");
}
return delegatee.isLocked(key);
}
@Override
public List<Lock> getLocks() {
return delegatee.getLocks();
}
@Override
public void destroy() {
delegatee.destroy();
}
}
/*
* Copyright 2017 Hippo B.V. (http://www.onehippo.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onehippo.services.lock;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class DbLock extends AbstractLock {
private static final Logger log = LoggerFactory.getLogger(DbLock.class);
ResultSet dbLockResult;
DbLock(final String lockKey, final ResultSet dbLockSet) {
super(lockKey, Thread.currentThread().getName(), System.currentTimeMillis());
this.dbLockResult = dbLockSet;
}
@Override
void destroy() {
try {
dbLockResult.close();
} catch (SQLException e) {
log.error("Error while destroying DbLock", e);
}
}
}
/*
* Copyright 2017 Hippo B.V. (http://www.onehippo.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onehippo.services.lock;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import javax.sql.DataSource;
import org.onehippo.cms7.services.lock.LockException;
import org.onehippo.cms7.services.lock.LockManager;
import org.onehippo.services.lock.db.DbHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DbLockManager extends AbstractLockManager implements LockManager {
private static final Logger log = LoggerFactory.getLogger(DbLockManager.class);
final static String TABLE_NAME_LOCK = "hippolock";
final static String TABLE_NAME_LOCK_OVERVIEW = "hippolockoverview";
final static String CREATE_LOCK_TABLE_STATEMENT = "CREATE TABLE %s (lockKey VARCHAR(256) NOT NULL)";
// TODO for oracle it must be NUMBER instead of BIGINT
final static String CREATE_LOCK_OVERVIEW_TABLE_STATEMENT = "CREATE TABLE %s (lockKey VARCHAR(256) NOT NULL, lockTime BIGINT NOT NULL)";
private DataSource dataSource;
private final Map<String, DbLock> locks = new HashMap();
public DbLockManager(final DataSource dataSource) {
this.dataSource = dataSource;
DbHelper.createTableIfNeeded(dataSource, CREATE_LOCK_TABLE_STATEMENT, TABLE_NAME_LOCK);
DbHelper.createTableIfNeeded(dataSource, CREATE_LOCK_OVERVIEW_TABLE_STATEMENT, TABLE_NAME_LOCK_OVERVIEW);
try {
lock("foo");
} catch (LockException e) {
e.printStackTrace();
}
}
@Override
Logger getLogger() {
return log;
}
@Override
AbstractLock createLock(final String key) throws LockException {
final String lockStatement = "SELECT * FROM " + TABLE_NAME_LOCK + " WHERE lockKey=? FOR UPDATE NOWAIT";
final String insertStatement = "INSERT INTO " + TABLE_NAME_LOCK + " VALUES(?)";
try (Connection connection = dataSource.getConnection()) {
final PreparedStatement preparedLockStatement = connection.prepareStatement(lockStatement);
preparedLockStatement.setString(1, key);
preparedLockStatement.setQueryTimeout(10);
// the lockResultSet must no be in the autoclosable 'try' because we need to keep it open
ResultSet lockResultSet = preparedLockStatement.executeQuery();
if (!lockResultSet.next()) {
// entry did not yet exist, we need to add an entry first
lockResultSet.close();
final PreparedStatement preparedInsertstatement = connection.prepareStatement(insertStatement);
preparedInsertstatement.setString(1, key);
try {
preparedInsertstatement.execute();
connection.commit();
} catch (SQLException e) {
// entry can already be created concurrently by other cluster node. We can still try to get the lock
// now
System.out.println(e);
}
lockResultSet = preparedLockStatement.executeQuery();
if (!lockResultSet.next()) {
String msg = String.format("Unexpected : A row for '%s' was expected and if it could not be locked, an SQL Exception was " +
"expected.", key);
log.error(msg);
throw new LockException(msg);
}
}
// push the lockResultSet in a cache such that it cannot be GC-ed and thus not closed as a result of being GC-ed
return new DbLock(key, lockResultSet);
} catch (SQLException e) {
// TODO
log.error(e.toString());
throw new LockException(e);
}
}
}
/*
* Copyright 2017 Hippo B.V. (http://www.onehippo.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onehippo.services.lock;
import javax.jcr.RepositoryException;
import javax.sql.DataSource;
import org.apache.jackrabbit.core.util.db.ConnectionHelper;
import org.apache.jackrabbit.core.util.db.ConnectionHelperDataSourceAccessor;
import org.hippoecm.repository.jackrabbit.RepositoryImpl;
import org.onehippo.cms7.services.lock.LockManager;
......@@ -10,17 +28,23 @@ public class LockManagerFactory {
private RepositoryImpl repositoryImpl;
public LockManagerFactory(final RepositoryImpl repositoryImpl) {
this.repositoryImpl = repositoryImpl;
}
/**
* Creates the {@link LockManager} which can be used for general purpose locking *not* using JCR at all
*
* @throws RuntimeException if the lock manager cannot be created, resulting the repository startup to
* short-circuit
* @throws RuntimeException if the lock manager cannot be created, resulting the repository startup to short-circuit
* @throws RepositoryException if a repository exception happened while creating the lock manager
*/
public LockManager create() throws RuntimeException, RepositoryException {
return new MemoryLockManager();
final ConnectionHelper journalConnectionHelper = repositoryImpl.getJournalConnectionHelperAccessor().getConnectionHelper();
if (journalConnectionHelper != null) {
final DataSource dataSource = ConnectionHelperDataSourceAccessor.getDataSource(journalConnectionHelper);
return new AssertingLockManager(new DbLockManager(dataSource));
} else {
return new AssertingLockManager(new MemoryLockManager());
}
}
}
/*
* Copyright 2017 Hippo B.V. (http://www.onehippo.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onehippo.services.lock;
class MemoryLock extends AbstractLock {
public MemoryLock(final String lockKey) {
super(lockKey, Thread.currentThread().getName(), System.currentTimeMillis());
}
@Override
public void destroy() {
}
}
......@@ -15,130 +15,22 @@
*/
package org.onehippo.services.lock;
import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.onehippo.cms7.services.lock.Lock;
import org.onehippo.cms7.services.lock.LockException;
import org.onehippo.cms7.services.lock.LockManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MemoryLockManager implements LockManager {
public class MemoryLockManager extends AbstractLockManager implements LockManager {
private static final Logger log = LoggerFactory.getLogger(MemoryLockManager.class);
private final Map<String, MemoryLock> locks = new HashMap();
@Override
public synchronized void lock(final String key) throws LockException {
final MemoryLock memoryLock = locks.get(key);
if (memoryLock == null) {
log.debug("Create lock '{}' for thread '{}'", key, Thread.currentThread().getName());
locks.put(key, new MemoryLock(key));
return;
}
final Thread lockThread = memoryLock.thread.get();
if (lockThread == null) {
log.warn("Thread '{}' that created lock for '{}' has stopped without releasing the lock. Thread '{}' " +
"now gets the lock", memoryLock.getLockOwner(), key, Thread.currentThread().getName());
memoryLock.thread = new WeakReference<>(Thread.currentThread());
return;
}
if (lockThread == Thread.currentThread()) {
log.debug("Thread '{}' already contains lock '{}', increase hold count", Thread.currentThread().getName(), key);
memoryLock.increment();
return;
}
throw new LockException(String.format("This thread '%s' cannot lock '%s' : already locked by thread '%s'",
Thread.currentThread().getName(), key, lockThread.getName()));
}
@Override
public synchronized void unlock(final String key) throws LockException {
final MemoryLock memoryLock = locks.get(key);
if (memoryLock == null) {
log.debug("No lock present for '{}'", key);
return;
}
final Thread lockThread = memoryLock.thread.get();
if (lockThread == null) {
log.warn("Thread '{}' that created lock for '{}' has stopped without releasing the lock. Removing lock now",
memoryLock.getLockOwner(), key, Thread.currentThread().getName());
locks.remove(key);
}
if (lockThread != Thread.currentThread()) {
throw new LockException(String.format("Thread '%s' cannot unlock '%s' because lock owned by '%s'", Thread.currentThread().getName(), key,
lockThread.getName()));
}
memoryLock.decrement();
if (memoryLock.holdCount < 0) {
log.error("Hold count of lock should never be able to be less than 0. Core implementation issue in {}. Remove " +
"lock for {} nonetheless.",
this.getClass().getName(), key);
locks.remove(key);
} else if (memoryLock.holdCount == 0) {
log.debug("Remove lock '{}'", key);
locks.remove(key);
} else {
log.debug("Lock '{}' will not be removed since hold count is '{}'", key, memoryLock.holdCount);
}
Logger getLogger() {
return log;
}
@Override
public synchronized boolean isLocked(final String key) throws LockException {
expungeNeverUnlockedLocksFromGCedThreads();
return locks.containsKey(key);
}
@Override
public synchronized List<Lock> getLocks() {
expungeNeverUnlockedLocksFromGCedThreads();
return new ArrayList<>(locks.values());
}
@Override
public void destroy() {
Iterator<Map.Entry<String, MemoryLock>> iterator = locks.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, MemoryLock> next = iterator.next();
log.warn("Lock '{}' owned by '{}' was never unlocked. Removing the lock now.", next.getKey(), next.getValue().getLockOwner());
}
}
private void expungeNeverUnlockedLocksFromGCedThreads() {
Iterator<Map.Entry<String, MemoryLock>> iterator = locks.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, MemoryLock> next = iterator.next();
if (next.getValue().thread.get() == null) {
log.warn("Lock '{}' with lockOwner '{}' was present but the Thread that created the lock does not exist any more. " +
"Removing the lock now", next.getKey(), next.getValue().getLockOwner());
iterator.remove();
}
}
}
class MemoryLock extends Lock {
private WeakReference<Thread> thread;
int holdCount;
public MemoryLock(final String lockKey) {
super(lockKey, Thread.currentThread().getName(), System.currentTimeMillis());
thread = new WeakReference<>(Thread.currentThread());
holdCount = 1;
}
public void increment() {
holdCount++;
}
public void decrement() {
holdCount--;
}
AbstractLock createLock(final String key) throws LockException {
return new MemoryLock(key);
}
}
/*
* Copyright 2017 Hippo B.V. (http://www.onehippo.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onehippo.services.lock.db;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import javax.sql.DataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DbHelper {
private static final Logger log = LoggerFactory.getLogger(DbHelper.class);
/**
* Creates the table {@code tableName} and throws a {@link RuntimeException} if it does not succeed in it. Note that
* if in the meantime another cluster node has created the table, this method does not throw an exception but just
* returns.
* @param dataSource
* @param tableName
*/
public static void createTableIfNeeded(final DataSource dataSource, final String createTableStatement, final String tableName) throws RuntimeException {
try {
try (Connection connection = dataSource.getConnection()) {
final boolean tableExists = tableExists(connection, tableName);
if (!tableExists) {
log.info("Creating table {} ", tableName);
try (Statement statement = connection.createStatement()) {
statement.addBatch(String.format(createTableStatement, tableName));
statement.addBatch("CREATE UNIQUE INDEX " + tableName + "_idx_1 on " + tableName + "(lockkey)");
statement.setQueryTimeout(10);
statement.executeBatch();
} catch (SQLException e) {
if (tableExists(connection, tableName)) {
log.debug("Table {} already created by another cluster node", tableName);
} else {
log.error("Failed to create table {}: {}", tableName, e.getMessage());
throw e;
}
}
}
}
} catch (SQLException e) {
log.error("Could not get a connection or could not create table");
throw new RuntimeException("Could not get a connection not create table", e);
}
}
public static boolean tableExists(final Connection connection, final String tableName) throws SQLException {