Package ch.cern.dirq
Class QueueSimple
java.lang.Object
ch.cern.dirq.QueueSimple
QueueSimple - object oriented interface to a simple directory based queue.
A port of Perl module Directory::Queue::Simple http://search.cpan.org/dist/Directory-Queue/
The documentation from Directory::Queue::Simple module has been adapted for Java.
Compared to normal directory queue, this module:
The names of the intermediate directories are time based: the element insertion time is used to create a 8-digits long hexadecimal number. The granularity (see the constructor) is used to limit the number of new directories. For instance, with a granularity of 60 (the default), new directories will be created at most once per minute.
Since there is usually a filesystem limit in the number of directories a directory can hold, there is a trade-off to be made. If you want to support many added elements per second, you should use a low granularity to keep small directories. However, in this case, you will create many directories and this will limit the total number of elements you can store.
The elements themselves are stored in files (one per element) with a 14-digits long hexadecimal name SSSSSSSSMMMMMR where:
A temporary element (being added to the queue) will have a
A locked element will have a hard link with the same name and the
Please refer to
A port of Perl module Directory::Queue::Simple http://search.cpan.org/dist/Directory-Queue/
The documentation from Directory::Queue::Simple module has been adapted for Java.
Usage
// sample producer
QueueSimple dirq = new QueueSimple("/tmp/test");
for (int i=0; i < 100; i++) {
String name = dirq.add("element " + i);
System.out.println("# added element " + i + " as " + name);
}
// sample consumer
dirq = QueueSimple("/tmp/test");
for (String name: dirq) {
if (! dirq.lock(name)) {
continue;
}
System.out.println("# reading element " + name);
String data = dirq.get(name);
// one could use dirq.unlock(name) to only browse the queue...
dirq.remove(name);
}
Description
This module is very similar to the normal directory queue, but uses a different way to store data in the filesystem, using less directories. Its API is almost identical.Compared to normal directory queue, this module:
- is simpler
- is faster
- uses less space on disk
- can be given existing files to store
- does not support schemas
- can only store and retrieve byte strings
- is not compatible (at filesystem level) with the normal directory queue
Directory Structure
The toplevel directory contains intermediate directories that contain the stored elements, each of them in a file.The names of the intermediate directories are time based: the element insertion time is used to create a 8-digits long hexadecimal number. The granularity (see the constructor) is used to limit the number of new directories. For instance, with a granularity of 60 (the default), new directories will be created at most once per minute.
Since there is usually a filesystem limit in the number of directories a directory can hold, there is a trade-off to be made. If you want to support many added elements per second, you should use a low granularity to keep small directories. However, in this case, you will create many directories and this will limit the total number of elements you can store.
The elements themselves are stored in files (one per element) with a 14-digits long hexadecimal name SSSSSSSSMMMMMR where:
- SSSSSSSS represents the number of seconds since the Epoch
- MMMMM represents the microsecond part of the time since the Epoch
- R is a random hexadecimal digit used to reduce name collisions
A temporary element (being added to the queue) will have a
.tmp
suffix.
A locked element will have a hard link with the same name and the
.lck
suffix.
Please refer to
Queue
for general information about
directory queues.-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionprivate static class
FileFilter class to iterate over temporary or locked elements.private static class
FileFilter class to iterate over (normal) elements.private static class
FileFilter class to iterate over intermediate directories.private static class
Iterator for the simple directory queue (private). -
Field Summary
FieldsModifier and TypeFieldDescriptionprivate static final int
private static final int
private static final int
static final Pattern
private Set<PosixFilePermission>
private static final FileFilter
private static final FileFilter
static final Pattern
private Set<PosixFilePermission>
private int
private static final FileFilter
static final String
private static final org.slf4j.Logger
private static final int
private static final int
private static final long
private static final int
private static final int
private static final long
private int
private int
private String
private String
private static Random
private int
private static final long
static final String
private int
-
Constructor Summary
ConstructorsConstructorDescriptionQueueSimple
(String path) Constructor creating a simple directory queue from the given path.QueueSimple
(String path, int numask) Constructor creating a simple directory queue from the given path and umask. -
Method Summary
Modifier and TypeMethodDescriptionadd
(byte[] data) Add byte array data to the queue.Add String data to the queue.private Path
addDataHelper
(String dir, byte[] data) private Path
addDataHelper
(String dir, String data) Add the given file (identified by its path) to the queue and return the corresponding element name, the file must be on the same filesystem and will be moved to the queue.private String
addPathHelper
(Path tmp, String dir) int
count()
Return the number of elements in the queue.private Path
createFile
(String path) private String
private static Set<PosixFilePermission>
directoryPerms
(int numask) private static String
elementName
(int rnd) private void
ensureDirectory
(Path path) private static Set<PosixFilePermission>
filePerms
(int numask) Get the given locked element as String data.byte[]
getAsByteArray
(String name) Get the given locked element as byte array data.int
Get the granularity.getId()
Return a unique identifier for the queue.int
Get the default maxLock for purge().int
Get the default maxTemp for purge().private Path
getNewPath
(String dir) Get the path of the given locked element.Return the path of the queue.int
Get the random hexadecimal digit.int
getUmask()
Get the umask.iterator()
Iterator for the simple directory queue.boolean
Lock an element in permissive mode.boolean
Lock an element.void
purge()
Purge the queue by removing unused intermediate directories, removing too old temporary elements and unlocking too old locked elements (aka staled locks); note: this can take a long time on queues with many elements.void
purge
(int maxLock) Purge the queue by removing unused intermediate directories, removing too old temporary elements and unlocking too old locked elements (aka staled locks); note: this can take a long time on queues with many elements.void
purge
(int maxLock, int maxTemp) Purge the queue by removing unused intermediate directories, removing too old temporary elements and unlocking too old locked elements (aka staled locks); note: this can take a long time on queues with many elements.void
Remove a locked element from the queue.setGranularity
(int value) Set the granularity.setMaxLock
(int value) Set the default maxLock for purge().setMaxTemp
(int value) Set the default maxTemp for purge().setRndHex
(int value) Set the random hexadecimal digit.setUmask
(int value) Set the umask.private boolean
boolean
Unlock an element in non-permissive mode.boolean
Unlock an element.Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
Methods inherited from interface java.lang.Iterable
forEach, spliterator
-
Field Details
-
logger
private static final org.slf4j.Logger logger -
TEMPORARY_SUFFIX
- See Also:
-
LOCKED_SUFFIX
- See Also:
-
DIRECTORY_REGEXP
-
ELEMENT_REGEXP
-
DEFAULT_GRANULARITY
private static final int DEFAULT_GRANULARITY- See Also:
-
DEFAULT_MAXLOCK
private static final int DEFAULT_MAXLOCK- See Also:
-
DEFAULT_MAXTEMP
private static final int DEFAULT_MAXTEMP- See Also:
-
MAX_RNDHEX
private static final int MAX_RNDHEX- See Also:
-
MAX_UMASK
private static final int MAX_UMASK- See Also:
-
MAX_DIRECTORY_UMASK
private static final int MAX_DIRECTORY_UMASK- See Also:
-
MAX_FILE_UMASK
private static final int MAX_FILE_UMASK- See Also:
-
SECOND
private static final long SECOND- See Also:
-
NANO2MICRO
private static final long NANO2MICRO- See Also:
-
MAX_MICRO
private static final long MAX_MICRO- See Also:
-
INTERMEDIATE_DIRECTORY_FF
-
ELEMENT_FF
-
DOT_ELEMENT_FF
-
rand
-
granularity
private int granularity -
qMaxLock
private int qMaxLock -
qMaxTemp
private int qMaxTemp -
rndHex
private int rndHex -
umask
private int umask -
queueId
-
queuePath
-
directoryPermissions
-
filePermissions
-
-
Constructor Details
-
QueueSimple
Constructor creating a simple directory queue from the given path.- Parameters:
path
- path of the directory queue- Throws:
IOException
- if any file operation fails
-
QueueSimple
Constructor creating a simple directory queue from the given path and umask.- Parameters:
path
- path of the directory queuenumask
- numerical umask of the directory queue- Throws:
IOException
- if any file operation fails
-
-
Method Details
-
getQueuePath
Description copied from interface:Queue
Return the path of the queue.- Specified by:
getQueuePath
in interfaceQueue
- Returns:
- queue path
-
getId
Description copied from interface:Queue
Return a unique identifier for the queue. -
add
Description copied from interface:Queue
Add String data to the queue.- Specified by:
add
in interfaceQueue
- Parameters:
data
- data to be added- Returns:
- element name (as directory_name/file_name)
- Throws:
IOException
- if any file operation fails
-
add
Description copied from interface:Queue
Add byte array data to the queue.- Specified by:
add
in interfaceQueue
- Parameters:
data
- data to be added- Returns:
- element name (as directory_name/file_name)
- Throws:
IOException
- if any file operation fails
-
addPath
Description copied from interface:Queue
Add the given file (identified by its path) to the queue and return the corresponding element name, the file must be on the same filesystem and will be moved to the queue.- Specified by:
addPath
in interfaceQueue
- Parameters:
path
- path of the file to be added- Returns:
- element name (as directory_name/file_name)
- Throws:
IOException
- if any file operation fails
-
get
Description copied from interface:Queue
Get the given locked element as String data.- Specified by:
get
in interfaceQueue
- Parameters:
name
- name of the element to be retrieved- Returns:
- data associated with the given element
- Throws:
IOException
-
getAsByteArray
Description copied from interface:Queue
Get the given locked element as byte array data.- Specified by:
getAsByteArray
in interfaceQueue
- Parameters:
name
- name of the element to be retrieved- Returns:
- data associated with the given element
- Throws:
IOException
-
getPath
Description copied from interface:Queue
Get the path of the given locked element.
This pathFile can be read but not removed, you must use the remove() method for this purpose. -
lock
Description copied from interface:Queue
Lock an element in permissive mode.- Specified by:
lock
in interfaceQueue
- Parameters:
name
- name of the element to be locked- Returns:
true
on success,false
if the element could not be locked- Throws:
IOException
- if any file operation fails
-
lock
Description copied from interface:Queue
Lock an element.- Specified by:
lock
in interfaceQueue
- Parameters:
name
- name of the element to be lockedpermissive
- work in permissive mode- Returns:
true
on success,false
if the element could not be locked- Throws:
IOException
- if any file operation fails
-
unlock
Description copied from interface:Queue
Unlock an element in non-permissive mode.- Specified by:
unlock
in interfaceQueue
- Parameters:
name
- name of the element to be unlocked- Returns:
true
on success,false
if the element could not be unlocked- Throws:
IOException
- if any file operation fails
-
unlock
Description copied from interface:Queue
Unlock an element.- Specified by:
unlock
in interfaceQueue
- Parameters:
name
- name of the element to be unlockedpermissive
- work in permissive mode- Returns:
true
on success,false
if the element could not be unlocked- Throws:
IOException
- if any file operation fails
-
remove
Description copied from interface:Queue
Remove a locked element from the queue.- Specified by:
remove
in interfaceQueue
- Parameters:
name
- name of the element to be removed- Throws:
IOException
- if any file operation fails
-
count
public int count()Description copied from interface:Queue
Return the number of elements in the queue.
Locked elements are counted but temporary elements are not. -
purge
Description copied from interface:Queue
Purge the queue by removing unused intermediate directories, removing too old temporary elements and unlocking too old locked elements (aka staled locks); note: this can take a long time on queues with many elements.
It uses default value for maxTemp and maxLock- Specified by:
purge
in interfaceQueue
- Throws:
IOException
- if any file operation fails
-
purge
Description copied from interface:Queue
Purge the queue by removing unused intermediate directories, removing too old temporary elements and unlocking too old locked elements (aka staled locks); note: this can take a long time on queues with many elements.- Specified by:
purge
in interfaceQueue
- Parameters:
maxLock
- maximum time for a locked element (in seconds); if set to 0, locked elements will not be unlocked; if set to null, the object's default value will be used- Throws:
IOException
- if any file operation fails
-
purge
Description copied from interface:Queue
Purge the queue by removing unused intermediate directories, removing too old temporary elements and unlocking too old locked elements (aka staled locks); note: this can take a long time on queues with many elements.- Specified by:
purge
in interfaceQueue
- Parameters:
maxLock
- maximum time for a locked element (in seconds); if set to 0, locked elements will not be unlocked; if set to null, the object's default value will be usedmaxTemp
- maximum time for a temporary element (in seconds); if set to 0, temporary elements will not be removed if set to null, the object's default value will be used- Throws:
IOException
- if any file operation fails
-
getGranularity
public int getGranularity()Get the granularity.- Returns:
- granularity (in seconds)
-
setGranularity
Set the granularity.- Parameters:
value
- granularity to be set (in seconds)- Returns:
- the object itself
-
getUmask
public int getUmask()Get the umask.- Returns:
- numerical umask
-
setUmask
Set the umask.- Parameters:
value
- umask to be set (numerical)- Returns:
- the object itself
-
getMaxLock
public int getMaxLock()Get the default maxLock for purge().- Returns:
- maximum lock time (in seconds)
-
setMaxLock
Set the default maxLock for purge().- Parameters:
value
- maximum lock time (in seconds)- Returns:
- the object itself
-
getMaxTemp
public int getMaxTemp()Get the default maxTemp for purge().- Returns:
- maximum temporary time (in seconds)
-
setMaxTemp
Set the default maxTemp for purge().- Parameters:
value
- maximum temporary time (in seconds)- Returns:
- the object itself
-
getRndHex
public int getRndHex()Get the random hexadecimal digit.- Returns:
- numerical hexadecimal digit
-
setRndHex
Set the random hexadecimal digit.- Parameters:
value
- hexadecimal digit to be set (numerical)- Returns:
- the object itself
-
directoryPerms
-
filePerms
-
directoryName
-
elementName
-
addPathHelper
- Throws:
IOException
-
createFile
- Throws:
IOException
-
getNewPath
- Throws:
IOException
-
addDataHelper
- Throws:
IOException
-
addDataHelper
- Throws:
IOException
-
ensureDirectory
- Throws:
IOException
-
touchFile
-
iterator
Iterator for the simple directory queue.
-