public class ElasticsearchCommitter extends AbstractBatchCommitter
Commits documents to Elasticsearch. This committer relies on Elasticsearch REST API.
Elasticsearch expects a field named "_id" that uniquely identifies each documents. You can provide that field yourself in documents you submit. If you do not specify an "_id" field, this committer will create one for you, using the document reference as the identifier value.
By default the "body" of a document is read as an input stream
and stored in a "content" field. You can change that target field name
with setTargetContentField(String)
. If you set the target
content field to null
, it will effectively skip storing
the content stream.
Your Elasticsearch installation may consider dots in field names
to be representing "objects", which may not always be what you want.
If having dots is causing you issues, make sure not to submit fields
with dots, or use setDotReplacement(String)
to replace dots
with a character of your choice (e.g., underscore).
If your dot represents a nested object, keep reading.
It is possible to provide a regular expression
that will identify one or more fields containing a JSON object rather
than a regular string (setJsonFieldsPattern(String)
). For example,
this is a useful way to store nested objects. While very flexible,
it can be challenging to come up with the JSON structure. You may
want to consider custom code.
For this to work properly, make sure you define your Elasticsearch
field mappings on your index beforehand.
As of this writing, Elasticsearch 5 or higher have a 512 bytes
limitation on its "_id" field.
By default, an error (from Elasticsearch) will result from trying to submit
documents with an invalid ID. You can get around this by
setting setFixBadIds(boolean)
to true
. It will
truncate references that are too long and append a hash code to it
representing the truncated part. This approach is not 100%
collision-free (uniqueness), but it should safely cover the vast
majority of cases.
As of Elasticsearch 7.0, the index type has been deprecated.
If you are using Elasticsearch 7.0 or higher, do not configure the
typeName
. Doing so may cause errors.
The typeName
is available only for backward compatibility
for those using this Committer with older versions of Elasticsearch.
Basic authentication is supported for password-protected clusters.
Passwords can be encrypted using EncryptionUtil
(or
command-line "encrypt.bat" or "encrypt.sh" if those are available to you).
In order for the password to be decrypted properly, you need
to specify the encryption key used to encrypt it. The key can obtained
from a few supported locations. The combination of the password key
"value" and "source" is used to properly locate the key.
The supported sources are:
key |
The actual encryption key. |
file |
Path to a file containing the encryption key. |
environment |
Name of an environment variable containing the key. |
property |
Name of a JVM system property containing the key. |
You can specify timeout values for when this committer sends documents to Elasticsearch.
Optionally apply a committer only to certain type of documents. Documents are restricted based on their metadata field names and values. This option can be used to perform document routing when you have multiple committers defined.
By default, this abstract class applies field mappings for metadata fields, but leaves the document reference and content (input stream) for concrete implementations to handle. In other words, they only apply to a committer request metadata. Field mappings are performed on committer requests before upserts and deletes are actually performed.
<committer
class="com.norconex.committer.elasticsearch.ElasticsearchCommitter">
<nodes>
(Comma-separated list of Elasticsearch node URLs.
Defaults to http://localhost:9200)
</nodes>
<indexName>(Name of the index to use)</indexName>
<typeName>
(Name of the type to use. Deprecated since Elasticsearch v7.)
</typeName>
<ignoreResponseErrors>[false|true]</ignoreResponseErrors>
<discoverNodes>[false|true]</discoverNodes>
<dotReplacement>
(Optional value replacing dots in field names)
</dotReplacement>
<jsonFieldsPattern>
(Optional regular expression to identify fields containing JSON
objects instead of regular strings)
</jsonFieldsPattern>
<connectionTimeout>(milliseconds)</connectionTimeout>
<socketTimeout>(milliseconds)</socketTimeout>
<fixBadIds>
[false|true](Forces references to fit into Elasticsearch _id field.)
</fixBadIds>
<!-- Use the following if authentication is required. -->
<credentials>
<username>(the username)</username>
<password>(the optionally encrypted password)</password>
<passwordKey>
<value>(The actual password encryption key or a reference to it.)</value>
<source>[key|file|environment|property]</source>
<size>(Size in bits of encryption key. Default is 128.)</size>
</passwordKey>
</credentials>
<sourceIdField>
(Optional document field name containing the value that will be stored
in Elasticsearch "_id" field. Default is the document reference.)
</sourceIdField>
<targetContentField>
(Optional Elasticsearch field name to store the document
content/body. Default is "content".)
</targetContentField>
<!-- multiple "restrictTo" tags allowed (only one needs to match) -->
<restrictTo>
<fieldMatcher
method="[basic|csv|wildcard|regex]"
ignoreCase="[false|true]"
ignoreDiacritic="[false|true]"
partial="[false|true]">
(field-matching expression)
</fieldMatcher>
<valueMatcher
method="[basic|csv|wildcard|regex]"
ignoreCase="[false|true]"
ignoreDiacritic="[false|true]"
partial="[false|true]">
(value-matching expression)
</valueMatcher>
</restrictTo>
<fieldMappings>
<!-- Add as many field mappings as needed -->
<mapping
fromField="(source field name)"
toField="(target field name)"/>
</fieldMappings>
<!-- Settings for default queue implementation ("class" is optional): -->
<queue
class="com.norconex.committer.core3.batch.queue.impl.FSQueue">
<batchSize>
(Optional number of documents queued after which we process a batch.
Default is 20.)
</batchSize>
<maxPerFolder>
(Optional maximum number of files or directories that can be queued
in a single folder before a new one gets created. Default is 500.)
</maxPerFolder>
<commitLeftoversOnInit>
(Optionally force to commit any leftover documents from a previous
execution. E.g., prematurely ended. Default is "false").
</commitLeftoversOnInit>
<onCommitFailure>
<splitBatch>[OFF|HALF|ONE]</splitBatch>
<maxRetries>(Max retries upon commit failures. Default is 0.)</maxRetries>
<retryDelay>
(Delay in milliseconds between retries. Default is 0.)
</retryDelay>
<ignoreErrors>
[false|true]
(When true, non-critical exceptions when interacting with the target
repository won't be thrown to try continue the execution with other
files to be committed. Instead, errors will be logged.
In both cases the failing batch/files are moved to an
"error" folder. Other types of exceptions may still be thrown.)
</ignoreErrors>
</onCommitFailure>
</queue>
</committer>
XML configuration entries expecting millisecond durations
can be provided in human-readable format (English only), as per
DurationParser
(e.g., "5 minutes and 30 seconds" or "5m30s").
<committer
class="com.norconex.committer.elasticsearch.ElasticsearchCommitter">
<indexName>some_index</indexName>
</committer>
The above example uses the minimum required settings, on the local host.
Modifier and Type | Field and Description |
---|---|
static int |
DEFAULT_CONNECTION_TIMEOUT |
static String |
DEFAULT_ELASTICSEARCH_CONTENT_FIELD |
static String |
DEFAULT_NODE |
static int |
DEFAULT_SOCKET_TIMEOUT |
static String |
ELASTICSEARCH_ID_FIELD |
Constructor and Description |
---|
ElasticsearchCommitter() |
Modifier and Type | Method and Description |
---|---|
protected void |
closeBatchCommitter() |
protected void |
commitBatch(Iterator<ICommitterRequest> it) |
protected org.elasticsearch.client.RestClient |
createRestClient() |
protected org.elasticsearch.client.sniff.Sniffer |
createSniffer(org.elasticsearch.client.RestClient client) |
boolean |
equals(Object other) |
int |
getConnectionTimeout()
Gets Elasticsearch connection timeout.
|
Credentials |
getCredentials()
Gets Elasticsearch authentication credentials.
|
String |
getDotReplacement()
Gets the character used to replace dots in field names.
|
String |
getIndexName()
Gets the index name.
|
String |
getJsonFieldsPattern()
Gets the regular expression matching fields that contains a JSON
object for its value (as opposed to a regular string).
|
List<String> |
getNodes()
Gets an unmodifiable list of Elasticsearch cluster node URLs.
|
int |
getSocketTimeout()
Gets Elasticsearch socket timeout.
|
String |
getSourceIdField()
Gets the document field name containing the value to be stored
in Elasticsearch "_id" field.
|
String |
getTargetContentField()
Gets the name of the Elasticsearch field where content will be stored.
|
String |
getTypeName()
Gets the type name.
|
int |
hashCode() |
protected void |
initBatchCommitter() |
boolean |
isDiscoverNodes()
Whether automatic discovery of Elasticsearch cluster nodes should be
enabled.
|
boolean |
isFixBadIds()
Gets whether to fix IDs that are too long for Elasticsearch
ID limitation (512 bytes max).
|
boolean |
isIgnoreResponseErrors()
Whether to ignore response errors.
|
protected void |
loadBatchCommitterFromXML(XML xml) |
protected void |
saveBatchCommitterToXML(XML xml) |
void |
setConnectionTimeout(int connectionTimeout)
Sets Elasticsearch connection timeout.
|
void |
setCredentials(Credentials credentials)
Sets Elasticsearch authentication credentials.
|
void |
setDiscoverNodes(boolean discoverNodes)
Sets whether automatic discovery of Elasticsearch cluster nodes should be
enabled.
|
void |
setDotReplacement(String dotReplacement)
Sets the character used to replace dots in field names.
|
void |
setFixBadIds(boolean fixBadIds)
Sets whether to fix IDs that are too long for Elasticsearch
ID limitation (512 bytes max).
|
void |
setIgnoreResponseErrors(boolean ignoreResponseErrors)
Sets whether to ignore response errors.
|
void |
setIndexName(String indexName)
Sets the index name.
|
void |
setJsonFieldsPattern(String jsonFieldsPattern)
Sets the regular expression matching fields that contains a JSON
object for its value (as opposed to a regular string).
|
void |
setNodes(List<String> nodes)
Sets cluster node URLs.
|
void |
setNodes(String... nodes)
Sets cluster node URLs.
|
void |
setSocketTimeout(int socketTimeout)
Sets Elasticsearch socket timeout.
|
void |
setSourceIdField(String sourceIdField)
Sets the document field name containing the value to be stored
in Elasticsearch "_id" field.
|
void |
setTargetContentField(String targetContentField)
Sets the name of the Elasticsearch field where content will be stored.
|
void |
setTypeName(String typeName)
Sets the type name.
|
String |
toString() |
consume, doClean, doClose, doDelete, doInit, doUpsert, getCommitterQueue, loadCommitterFromXML, saveCommitterToXML, setCommitterQueue
accept, addRestriction, addRestrictions, applyFieldMappings, clean, clearFieldMappings, clearRestrictions, close, delete, fireDebug, fireDebug, fireError, fireError, fireInfo, fireInfo, getCommitterContext, getFieldMappings, getRestrictions, init, loadFromXML, removeFieldMapping, removeRestriction, removeRestriction, saveToXML, setFieldMapping, setFieldMappings, upsert
clone, finalize, getClass, notify, notifyAll, wait, wait, wait
loadFromXML, saveToXML
public static final String ELASTICSEARCH_ID_FIELD
public static final String DEFAULT_ELASTICSEARCH_CONTENT_FIELD
public static final String DEFAULT_NODE
public static final int DEFAULT_CONNECTION_TIMEOUT
public static final int DEFAULT_SOCKET_TIMEOUT
public List<String> getNodes()
public void setNodes(String... nodes)
nodes
- Elasticsearch cluster nodespublic void setNodes(List<String> nodes)
nodes
- Elasticsearch cluster nodespublic String getTargetContentField()
public void setTargetContentField(String targetContentField)
null
value will disable storing the content.targetContentField
- field namepublic String getSourceIdField()
public void setSourceIdField(String sourceIdField)
null
to use the
document reference instead of a field (default).sourceIdField
- name of field containing id value,
or null
public String getIndexName()
public void setIndexName(String indexName)
indexName
- the index namepublic String getTypeName()
null
.public void setTypeName(String typeName)
null
.typeName
- type namepublic String getJsonFieldsPattern()
null
.public void setJsonFieldsPattern(String jsonFieldsPattern)
jsonFieldsPattern
- regular expressionpublic boolean isIgnoreResponseErrors()
true
the errors are logged instead.true
when ignoring response errorspublic void setIgnoreResponseErrors(boolean ignoreResponseErrors)
false
, an exception is
thrown if the Elasticsearch response contains an error.
When true
the errors are logged instead.ignoreResponseErrors
- true
when ignoring response
errorspublic boolean isDiscoverNodes()
true
if enabledpublic void setDiscoverNodes(boolean discoverNodes)
discoverNodes
- true
if enabledpublic Credentials getCredentials()
public void setCredentials(Credentials credentials)
credentials
- the credentialspublic String getDotReplacement()
null
(does not replace dots).null
public void setDotReplacement(String dotReplacement)
dotReplacement
- replacement character or null
public int getConnectionTimeout()
public void setConnectionTimeout(int connectionTimeout)
connectionTimeout
- millisecondspublic int getSocketTimeout()
public void setSocketTimeout(int socketTimeout)
socketTimeout
- millisecondspublic boolean isFixBadIds()
true
,
long IDs will be truncated and a hash code representing the
truncated part will be appended.true
to fix IDs that are too longpublic void setFixBadIds(boolean fixBadIds)
true
,
long IDs will be truncated and a hash code representing the
truncated part will be appended.fixBadIds
- true
to fix IDs that are too longprotected void initBatchCommitter() throws CommitterException
initBatchCommitter
in class AbstractBatchCommitter
CommitterException
protected void commitBatch(Iterator<ICommitterRequest> it) throws CommitterException
commitBatch
in class AbstractBatchCommitter
CommitterException
protected void closeBatchCommitter() throws CommitterException
closeBatchCommitter
in class AbstractBatchCommitter
CommitterException
protected org.elasticsearch.client.RestClient createRestClient()
protected org.elasticsearch.client.sniff.Sniffer createSniffer(org.elasticsearch.client.RestClient client)
protected void saveBatchCommitterToXML(XML xml)
saveBatchCommitterToXML
in class AbstractBatchCommitter
protected void loadBatchCommitterFromXML(XML xml)
loadBatchCommitterFromXML
in class AbstractBatchCommitter
public boolean equals(Object other)
equals
in class AbstractBatchCommitter
public int hashCode()
hashCode
in class AbstractBatchCommitter
public String toString()
toString
in class AbstractBatchCommitter
Copyright © 2013–2022 Norconex Inc.. All rights reserved.