code.onehippo.org is currently readonly. We are migrating to code.bloomreach.com, please continue working there on Monday 14/12. See: https://docs.bloomreach.com/display/engineering/GitLab

Commit 8ddbbc21 authored by Ard Schrijvers's avatar Ard Schrijvers

REPO-1811 Use the new LockManager instead of JCR Locking

parent d85ce35b
/*
* Copyright 2012-2013 Hippo B.V. (http://www.onehippo.com)
* Copyright 2012-2017 Hippo B.V. (http://www.onehippo.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -19,7 +19,6 @@ import java.io.IOException;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.jcr.Node;
......@@ -28,22 +27,20 @@ import javax.jcr.PathNotFoundException;
import javax.jcr.RepositoryException;
import javax.jcr.Session;
import javax.jcr.SimpleCredentials;
import javax.jcr.lock.Lock;
import javax.jcr.lock.LockException;
import javax.jcr.lock.LockManager;
import javax.jcr.observation.Event;
import javax.jcr.observation.EventIterator;
import javax.jcr.observation.EventListener;
import org.hippoecm.repository.api.HippoNodeType;
import org.onehippo.cms7.services.HippoServiceRegistry;
import org.onehippo.repository.locking.HippoLockManager;
import org.onehippo.cms7.services.lock.LockException;
import org.onehippo.cms7.services.lock.LockManager;
import org.onehippo.repository.modules.DaemonModule;
import org.onehippo.repository.modules.ProvidesService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ProvidesService(types = { NodeUpdaterService.class })
@ProvidesService(types = {NodeUpdaterService.class})
public class UpdaterExecutionModule implements DaemonModule, EventListener {
private static final Logger log = LoggerFactory.getLogger(UpdaterExecutionModule.class);
......@@ -51,13 +48,12 @@ public class UpdaterExecutionModule implements DaemonModule, EventListener {
private static final String UPDATE_PATH = "/" + HippoNodeType.CONFIGURATION_PATH + "/hippo:update";
private static final String UPDATE_QUEUE_PATH = UPDATE_PATH + "/hippo:queue";
private static final String UPDATE_HISTORY_PATH = UPDATE_PATH + "/hippo:history";
private static final long TWO_MINUTES = 60 * 2;
private static final String DEFAULT_CLUSTER_NODE_ID = "default";
private final ExecutorService updaterExecutor = Executors.newSingleThreadExecutor();
private Session session;
private UpdaterRegistry updaterRegistry;
private NodeUpdaterService updaterService;
private LockManager lockManager;
private volatile ExecuteUpdatersTask task;
private final Object monitor = new Object();
......@@ -65,7 +61,7 @@ public class UpdaterExecutionModule implements DaemonModule, EventListener {
public void initialize(final Session session) throws RepositoryException {
this.session = session;
session.getWorkspace().getObservationManager().addEventListener(this, Event.NODE_ADDED, UPDATE_QUEUE_PATH, false, null, null, false);
updaterRegistry = new UpdaterRegistry(session.impersonate(new SimpleCredentials("system", new char[] {})));
updaterRegistry = new UpdaterRegistry(session.impersonate(new SimpleCredentials("system", new char[]{})));
updaterRegistry.start();
updaterService = new NodeUpdaterService() {
......@@ -93,6 +89,8 @@ public class UpdaterExecutionModule implements DaemonModule, EventListener {
}
};
HippoServiceRegistry.registerService(updaterService, NodeUpdaterService.class);
lockManager = HippoServiceRegistry.getService(LockManager.class);
// check if any updaters are queued and execute them on startup
runExecuteUpdatersTask();
}
......@@ -135,28 +133,29 @@ public class UpdaterExecutionModule implements DaemonModule, EventListener {
private final class ExecuteUpdatersTask implements Runnable {
private Lock lock;
private ScheduledExecutorService keepAliveExecutor;
private UpdaterExecutor updaterExecutor;
private volatile boolean cancelled;
private ExecuteUpdatersTask() {}
private ExecuteUpdatersTask() {
}
@Override
public void run() {
boolean locked = false;
try {
if (lock()) {
startLockKeepAlive();
executeUpdatersInQueue();
}
} catch (RepositoryException e) {
log.error("Failed to obtain lock", e);
lockManager.lock(UPDATE_PATH);
locked = true;
executeUpdatersInQueue();
} catch (LockException e) {
log.info("Failed to obtain lock, most likely obtained by other cluster node already", e);
} finally {
synchronized (monitor) {
task = null;
}
stopLockKeepAlive();
unlock();
if (locked) {
lockManager.unlock(UPDATE_PATH);
}
}
}
......@@ -169,24 +168,6 @@ public class UpdaterExecutionModule implements DaemonModule, EventListener {
}
}
private boolean lock() throws RepositoryException {
log.debug("Trying to obtain lock");
final HippoLockManager lockManager = (HippoLockManager) session.getWorkspace().getLockManager();
if (!lockManager.isLocked(UPDATE_PATH) || lockManager.expireLock(UPDATE_PATH)) {
try {
lock = lockManager.lock(UPDATE_PATH, false, false, TWO_MINUTES, getClusterNodeId());
log.debug("Lock successfully obtained");
return true;
} catch (LockException e) {
// happens when other cluster node beat us to it
log.debug("Failed to set lock: " + e.getMessage());
}
} else {
log.debug("Already locked");
}
return false;
}
private void executeUpdatersInQueue() {
Node updaterNode;
while (!cancelled && (updaterNode = getNextUpdaterNodeFromQueue()) != null) {
......@@ -198,7 +179,7 @@ public class UpdaterExecutionModule implements DaemonModule, EventListener {
private void executeUpdater(final Node updaterNode) {
Session session = null;
try {
session = UpdaterExecutionModule.this.session.impersonate(new SimpleCredentials("system", new char[] {}));
session = UpdaterExecutionModule.this.session.impersonate(new SimpleCredentials("system", new char[]{}));
updaterExecutor = new UpdaterExecutor(updaterNode, session);
updaterExecutor.execute();
} catch (IOException e) {
......@@ -218,26 +199,6 @@ public class UpdaterExecutionModule implements DaemonModule, EventListener {
}
}
private void unlock() {
log.debug("Trying to release lock");
try {
final LockManager lockManager = session.getWorkspace().getLockManager();
if (lockManager.isLocked(UPDATE_PATH)) {
final Lock lock = lockManager.getLock(UPDATE_PATH);
if (lock.isLockOwningSession()) {
lockManager.unlock(UPDATE_PATH);
log.debug("Lock successfully released");
} else {
log.debug("We don't own the lock");
}
} else {
log.debug("Not locked");
}
} catch (RepositoryException e) {
log.error("Failed to release lock", e);
}
}
private void moveToHistory(final Node node) {
try {
// the updater node was modified externally by the executor
......@@ -258,43 +219,6 @@ public class UpdaterExecutionModule implements DaemonModule, EventListener {
}
}
private String getClusterNodeId() {
String clusteNodeId = session.getRepository().getDescriptor("jackrabbit.cluster.id");
if (clusteNodeId == null) {
clusteNodeId = DEFAULT_CLUSTER_NODE_ID;
}
return clusteNodeId;
}
private void startLockKeepAlive() {
keepAliveExecutor = Executors.newSingleThreadScheduledExecutor();
keepAliveExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
refreshLock();
}
}, 1l, 1l, TimeUnit.MINUTES);
}
private void stopLockKeepAlive() {
if (keepAliveExecutor != null) {
keepAliveExecutor.shutdown();
keepAliveExecutor = null;
}
}
private void refreshLock() {
try {
lock.refresh();
log.debug("Lock successfully refreshed");
} catch(LockException e) {
log.error("Failed to refresh lock", e);
updaterExecutor.cancel();
} catch (RepositoryException e) {
log.error("Failed to refresh lock", e);
}
}
public Node getNextUpdaterNodeFromQueue() {
NodeIterator nodes;
try {
......@@ -303,7 +227,7 @@ public class UpdaterExecutionModule implements DaemonModule, EventListener {
log.debug("Updater queue is empty. Nothing to execute");
return null;
}
} catch(PathNotFoundException e) {
} catch (PathNotFoundException e) {
log.debug("No updater queue");
return null;
} catch (RepositoryException e) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment