Canaux de publications
Les canaux de publications permettent d'alimenter et maintenir une queue d'entités à traiter.
Un canal se définit dans l'administration et possède les champs suivants:
- Un nom (Nom d'affichage)
- Un identifiant (identifiant unique du canal utilisé par les connecteurs)
- Un identifiant de catalogue (permet d'indiquer un catalogue associé au canal) [optionnel]
- Un JSON de configuration [optionnel]
- Un fichier de configuration utilisé par le connecteur associé [optionnel]
Sur chaque entité, la liste "Canaux de publication" indique les canaux vers lesquels publier l'entité.
Chaque connecteur est en charge d'alimenter son canal. Une fois un connecteur exécuté, les informations suivantes apparaissent dans l'administration:
- Identifiant du batch
- Heure début/fin
- Durée
- Le statut du dernier batch
- L'identifiant du dernier batch en succès
- Le nombre d'entités traitées / en échec sur le dernier batch
- Les éventuelles erreurs du batch
Un batch en succès ne veut pas dire que toutes les entités ont été importées avec succès dans ce batch, mais qu'il est allé au bout.
La dernière date est la date utilisée pour renvoyer les entités du batch, elle est alimentée par le connecteur et peut être modifiée manuellement.
Le champ action permet de:
- Stopper le connecteur ("STOP")
- Rejouer les entités en erreur ("RETRY")
- Resynchroniser toutes les entités ("RESET")
Sur chaque entité, on retrouve:
- Sa date de publication dans le canal
- L'identifiant du dernier batch
- Le statut du dernier batch et les éventuelles erreurs
- La date de modification
Si un catalogue est associé au canal, la date de modification est uniquement mise à jour pour les champs audités du catalogue. Si aucun catalogue n'est associé, cette date est alimentée par la plus grande date entre la date de formulation, modification et création.
Le champ "forcer" permet de forcer cette entité lors du prochain batch.
Par défaut, un canal renvoie les entités membres du canal dont la date de modification est inférieure ou égale à la dernière date.
Configuration
Recherches
Il est possible de configurer une recherche spécifique sur un canal en utilisant le champ Configuration.
Les valeurs suivantes de JSON sont possible:
{
"query": "(@cm\\: created: [% s TO MAX] OR @cm\\: modified: [% s TO MAX]) AND(TYPE: \"bcpg:product\" OR TYPE:\"bcpg:client\" OR TYPE:\"bcpg:supplier\" )",
}
%s est remplacé par le champ dernière date
Ou
{
"dateFilter": {
"dateFilterField": "cm:modified",
"dateFilterType": "Before, After, From, To, Equals",
"dateFilterDelay": 1,
"dateFilterDelayUnit": "Min, Hour, Day"
},
"versionFilter": {
"versionFilterType": "MAJOR, MINOR, NONE"
},
"entityFilter": {
"entityType": "bcpg:semiFinishedProduct",
"criteria": {
"assoc_bcpg_plants_added": "nodeRef"
}
}
}
Le champ dernière date est utilsé dans dateFilter
Connecteur
Il est possible de passer des propriétés au connecteur en utilisant le paramètre properties du JSON de configuration
{
"properties" : {
"connector.notify.enabled": true,
"connector.notify.from":"support@becpg.fr",
"connector.notify.to": "support@becpg.fr",
"remote.extra.fields": "cm:titled,cm:description",
"remote.extra.lists": "bcpg:compoList"
}
}
Api
La liste des entités est obtenue avec la requête LIST.
<url>/becpg/remote/channel/list?channelId={id}</url>
<url>/becpg/remote/channel/list?channelNodeRef={nodeRef}</url>
Le paramètre maxResults permet de spécifier un nombre de résultat (-1 pour avoir tous les résultats)
Le paramètre format permet de modifier le format XML de la réponse
format=xml (Par défaut) format=json (Format json) Le paramètre fields permet de spécifier des champs ou des associations à extraire dans les résultats
Architecture
L'appelant est chargé de mettre à jour les canaux de publications et les entités en suivant la cinématique suivante:
+-------------------+
| |
| Retrieve |
| channel info |
| |
+--------+----------+
|
|
+--------v----------+
| |
| Start batch and |
| reset channel |
| info |
| |
+--------+----------+
|
|
+--------v----------+
| |
| List channel |
| entities |
| |
+--------+----------+
|
|
+----------------v--------------------------------+
| |
| |
| +-------------------+ +-------------------+ |
| | | | | |
| | Retrieve | | Retrieve | |
| | entity info | | entity info | |
| | | | | |
| +--------+----------+ +--------+----------+ |
| | | |
| | | |
| +--------v----------+ +--------v----------+ |
| | | | | |
| | Process entity | | Process entity | |
| | | | | |
| +-------------------+ +-------------------+ |
| | | |
| | | |
| +--------v----------+ +--------v----------+ |
| | | | | |
| | Update channel | | Update channel | |
| | info on entity | | info on entity | |
| | | | | |
| +-------------------+ +-------------------+ |
| |
| |
-----------------^------------------------------- +
|
|
+--------v----------+
| |
| Update channel |
| |
+-------------------+
Récupération des informations du canal de publication
Pour récupérer les informations d'un canal de publication, effectuez une requête HTTP GET à l'URL suivante:
/alfresco/service/becpg/remote/entity?format=json&query=+TYPE:%22bp:pubChannel%22%20AND%20%3Dbp%5C:pubChannelId:%22sample-canal%22%20
Cette requête renverra les informations du canal de publication sous la forme suivante:
{
"entity": {
"path": "/app:company_home/cm:System/cm:Characts/bcpg:entityLists/cm:PubChannels",
"cm:name": "Sample canal",
"attributes": {
"bp:pubChannelId": "sample-canal",
"bp:pubChannelConfig": "..."
}
}
}
Démarrage du batch et remise à zéro des informations du canal de publication
Pour démarrer un batch et remettre à zéro les informations du canal de publication, effectuez une requête HTTP PUT à l'URL suivante:
/alfresco/service/becpg/remote/entity?format=json&format=json
Envoyez les informations suivantes dans le corps de la requête:
{
"entity": {
"path": "/app:company_home/cm:System/cm:Characts/bcpg:entityLists/cm:PubChannels",
"type": "bp:pubChannel",
"attributes": {
"bp:pubChannelBatchDuration": null,
"bp:pubChannelBatchId": 1,
"bp:pubChannelStatus": "STARTED",
"bp:pubChannelBatchEndTime": null,
"bp:pubChannelBatchStartTime": 1672320000652
},
"bp:pubChannelId": "sample-canal"
}
}
Liste des entités du canal de publication
Pour obtenir la liste des entités associées à un canal de publication, effectuez une requête HTTP GET à l'URL suivante:
/alfresco/service/becpg/remote/channel/list?format=json&channelId=sample-canal
Récupération des informations de chaque entité
Pour récupérer les informations d'une entité donnée, effectuez une requête HTTP GET à l'URL suivante:
/alfresco/service/becpg/remote/entity?format=json&nodeRef={entityNodeRef}
Où {entityNodeRef} est le nodeRef de l'entité à récupérer.
Mise à jour des informations du canal sur l'entité
Pour mettre à jour les informations du canal sur une entité donnée, effectuez une requête HTTP POST à l'URL suivante:
/alfresco/service/becpg/remote/entity?format=json&nodeRef={entityNodeRef}
Envoyez les informations suivantes dans le corps de la requête:
{
"entity": {
"datalists": {
"bp:pubChannelList": [
{
"type": "bp:pubChannelList",
"attributes": {
"bp:pubChannellListStatus": "SUCCESS",
"bp:pubChannellListError": "",
"bp:pubChannelListBatchId": 1
},
"bp:pubChannelListChannel": {
"path": "/app:company_home/cm:System/cm:Characts/bcpg:entityLists/cm:PubChannels",
"type": "bp:pubChannel",
"bp:pubChannelId": "sample-canal"
}
}
]
}
}
}
Mise à jour du canal de publication une fois le batch terminé
Une fois le batch terminé, mettez à jour le canal de publication en effectuant une requête HTTP PUT à l'URL suivante:
/alfresco/service/becpg/remote/entity?format=json
Envoyez les informations suivantes dans le corps de la requête:
{
"entity": {
"documents": [],
"path": "/app:company_home/cm:System/cm:Characts/bcpg:entityLists/cm:PubChannels",
"type": "bp:pubChannel",
"attributes": {
"bp:pubChannelLastDate": 1672320540452,
"bp:pubChannelAction": null,
"bp:pubChannelBatchDuration": 74,
"bp:pubChannelBatchId": 1,
"bp:pubChannelStatus": "COMPLETED",
"bp:pubChannelFailCount": 1,
"bp:pubChannelBatchEndTime": 1672320614749,
"bp:pubChannelError": "",
"bp:pubChannelLastSuccessBatchId": 1,
"bp:pubChannelReadCount": 1
},
"bp:pubChannelId": "sample-canal"
}
}
Exemple en utilisant les API Java et Spring batch
/**
* <p>StandardBatchExecutionListener class.</p>
*
* @author matthieu
* @version $Id: $Id
*/
public class StandardBatchExecutionListener extends BatchContextHolder {
private static Logger logger = LoggerFactory.getLogger(StandardBatchExecutionListener.class);
private BatchConfiguration batchConfiguration;
private JobExecution jobExecution;
/**
* <p>Constructor for StandardBatchExecutionListener.</p>
*
* @param batchConfiguration a {@link fr.becpg.connector.service.BatchConfiguration} object
*/
public StandardBatchExecutionListener(BatchConfiguration batchConfiguration) {
super();
this.batchConfiguration = batchConfiguration;
}
/** {@inheritDoc} */
@Override
public void beforeJob(JobExecution jobExecution) {
super.beforeJob(jobExecution);
this.jobExecution = jobExecution;
if (batchConfiguration.destPath() != null) {
File destFolder = new File(batchConfiguration.destPath());
if (!destFolder.exists()) {
destFolder.mkdirs();
}
}
if (batchConfiguration.channelId() != null) {
RemoteEntity channelEntity = ChannelAPIModel.createChannelEntity(batchConfiguration.channelId());
Map<String, Object> attributes = new HashMap<>();
attributes.put(ChannelAPIModel.PROP_CHANNEL_BATCHSTARTTIME, jobExecution.getStartTime());
attributes.put(ChannelAPIModel.PROP_CHANNEL_BATCHENDTIME, null);
attributes.put(ChannelAPIModel.PROP_CHANNEL_BATCHDURATION, null);
attributes.put(ChannelAPIModel.PROP_CHANNEL_BATCHID, jobExecution.getId());
attributes.put(ChannelAPIModel.PROP_CHANNEL_STATUS, jobExecution.getStatus());
channelEntity.setAttributes(attributes);
batchConfiguration.getEntityAPI().update(channelEntity);
}
}
/** {@inheritDoc} */
@Override
public void afterJob(JobExecution jobExecution) {
super.afterJob(jobExecution);
BatchContext batchContext = get();
Map<String, Object> attributes = new HashMap<>();
attributes.put(ChannelAPIModel.PROP_CHANNEL_BATCHENDTIME, jobExecution.getEndTime());
attributes.put(ChannelAPIModel.PROP_CHANNEL_BATCHDURATION, (jobExecution.getEndTime().getTime() - jobExecution.getStartTime().getTime())/1000);
attributes.put(ChannelAPIModel.PROP_CHANNEL_BATCHID, jobExecution.getId());
BatchStatus status = jobExecution.getStatus();
if (!jobExecution.getStepExecutions().isEmpty()) {
StepExecution stepExecution = jobExecution.getStepExecutions().iterator().next();
attributes.put(ChannelAPIModel.PROP_CHANNEL_READCOUNT, stepExecution.getReadCount());
}
if(!ChannelAPIModel.ACTION_RETRY.equals(jobExecution.getJobParameters().getString(BeCPGConnectorService.ACTION_PARAM))) {
if (ExitStatus.COMPLETED.equals(jobExecution.getExitStatus())) {
attributes.put(ChannelAPIModel.PROP_CHANNEL_LASTSUCCESSBATCHID, jobExecution.getId());
attributes.put(ChannelAPIModel.PROP_CHANNEL_LASTDATE, jobExecution.getStartTime());
if ( batchConfiguration.destPath() != null) {
DateFormat dateFormat = new SimpleDateFormat(BatchConfiguration.UTC_DATE_FORMAT);
dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
Date startDate = jobExecution.getStartTime();
try (BufferedWriter bw = new BufferedWriter(new FileWriter(batchConfiguration.getLastImportDateFile()))) {
bw.write(dateFormat.format(startDate));
} catch (IOException e) {
logger.error("Cannot write last import data", e);
jobExecution.addFailureException(e);
}
}
}
}
attributes.put(ChannelAPIModel.PROP_CHANNEL_ACTION, null);
attributes.put(ChannelAPIModel.PROP_CHANNEL_STATUS, status);
StringBuilder errors = new StringBuilder();
if (batchContext.hasErrors()) {
if (!batchContext.getErrors().isEmpty()) {
status = BatchStatus.FAILED;
for (String error : batchContext.getErrors()) {
errors.append(error);
errors.append("\n");
}
}
}
List<Throwable> exceptions = jobExecution.getFailureExceptions();
for (Throwable exception : exceptions) {
errors.append(formatExceptionMessage(exception));
errors.append("\n");
}
attributes.put(ChannelAPIModel.PROP_CHANNEL_ERROR, errors.toString());
if (batchContext.getFailedEntities() != null) {
attributes.put(ChannelAPIModel.PROP_CHANNEL_FAILCOUNT, batchContext.getFailedEntities().size());
} else {
attributes.put(ChannelAPIModel.PROP_CHANNEL_FAILCOUNT, 0);
}
if (batchConfiguration.channelId() != null) {
RemoteEntity channelEntity = ChannelAPIModel.createChannelEntity(batchConfiguration.channelId());
channelEntity.setAttributes(attributes);
batchConfiguration.getEntityAPI().update(channelEntity);
}
}
private String formatExceptionMessage(Throwable exception) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
exception.printStackTrace(new PrintStream(baos));
return baos.toString();
}
/**
* <p>onReadError.</p>
*
* @param ex a {@link java.lang.Exception} object
*/
@OnReadError
public void onReadError(Exception ex) {
get().addError(ex.getMessage());
}
/**
* <p>onProcessError.</p>
*
* @param item a {@link fr.becpg.api.model.RemoteEntityRef} object
* @param ex a {@link java.lang.Exception} object
*/
@OnProcessError
public void onProcessError(RemoteEntityRef item, Exception ex) {
sendError(item.getEntity().getId(), ex.getMessage());
}
/**
* <p>afterProcess.</p>
*
* @param item a {@link fr.becpg.api.model.RemoteEntityRef} object
* @param root a {@link fr.becpg.connector.model.RemoteEntityItemContext} object
*/
@AfterProcess
public void afterProcess(RemoteEntityRef item, RemoteEntityItemContext root) {
if (logger.isDebugEnabled()) {
ObjectMapper objectMapper = new ObjectMapper();
try {
logger.debug(objectMapper.setSerializationInclusion(Include.NON_NULL).writerWithDefaultPrettyPrinter()
.writeValueAsString(root.getEntity()));
} catch (JsonProcessingException e) {
logger.error("Invalid RemoteEntity", e);
}
}
}
/**
* <p>onWriteError.</p>
*
* @param ex a {@link java.lang.Exception} object
* @param items a {@link java.util.List} object
*/
@OnWriteError
public void onWriteError(Exception ex, List<? extends RemoteEntityItemContext> items) {
for (RemoteEntityItemContext item : items) {
sendError(item.getEntity().getId(), ex.getMessage());
}
}
/**
* <p>afterWrite.</p>
*
* @param items a {@link java.util.List} object
*/
@AfterWrite
public void afterWrite(List<? extends RemoteEntityItemContext> items) {
BatchContext batchContext = get();
for (RemoteEntityItemContext item : items) {
if (batchContext.getFailedEntities().containsKey(item.getEntity().getId())) {
sendError(item.getEntity().getId(), batchContext.getFailedEntities().get(item.getEntity().getId()));
} else {
sendSuccess(item.getEntity().getId());
}
}
if (logger.isInfoEnabled()) {
Integer totalCount = (Integer) jobExecution.getExecutionContext().get(BeCPGConnectorService.TOTAL_COUNT_PARAM);
if (totalCount != null) {
Integer reads = 0;
for (StepExecution step : jobExecution.getStepExecutions()) {
reads = reads + step.getReadCount();
}
logger.info("Write {}/{} entities - [ {} %]", reads,totalCount, Math.round((reads * 1d) / (totalCount * 1d) * 100) );
}
}
}
private void sendSuccess(String id) {
if (batchConfiguration.channelId() != null) {
try {
RemoteEntity entity = new RemoteEntity();
entity.setId(id);
RemoteEntity publicationChannelListItem = new RemoteEntity();
publicationChannelListItem.setType(ChannelAPIModel.TYPE_CHANNEL_LIST);
Map<String, Object> identifiers = new HashMap<>();
identifiers.put(ChannelAPIModel.ASSOC_CHANNELLIST_CHANNEL, ChannelAPIModel.createChannelEntity(batchConfiguration.channelId()));
publicationChannelListItem.setOptionalIdentifiers(identifiers);
Map<String, Object> attributes = new HashMap<>();
attributes.put(ChannelAPIModel.PROP_CHANNELLIST_BATCHID, get().getBatchId());
attributes.put(ChannelAPIModel.PROP_CHANNELLIST_PUBLISHEDDATE, new Date());
attributes.put(ChannelAPIModel.PROP_CHANNELLIST_STATUS, ChannelAPIModel.STATUS_COMPLETED);
attributes.put(ChannelAPIModel.PROP_CHANNELLIST_ERROR, "");
publicationChannelListItem.setAttributes(attributes);
Map<String, List<RemoteNodeInfo>> datalists = new HashMap<>();
datalists.put(ChannelAPIModel.TYPE_CHANNEL_LIST, Arrays.asList(publicationChannelListItem));
entity.setDatalists(datalists);
batchConfiguration.getEntityAPI().update(entity);
} catch (RemoteAPIException e) {
get().addError(id + " - " + e.getMessage());
logger.error("sendSuccess error: {} - {}" ,id, e.getMessage());
}
}
get().addVisitedEntity(id);
}
private void sendError(String id, String error) {
if (logger.isInfoEnabled()) {
logger.info("Error: {}" , error);
}
if (batchConfiguration.channelId() != null) {
try {
RemoteEntity entity = new RemoteEntity();
entity.setId(id);
RemoteEntity publicationChannelListItem = new RemoteEntity();
publicationChannelListItem.setType(ChannelAPIModel.TYPE_CHANNEL_LIST);
Map<String, Object> identifiers = new HashMap<>();
identifiers.put(ChannelAPIModel.ASSOC_CHANNELLIST_CHANNEL, ChannelAPIModel.createChannelEntity(batchConfiguration.channelId()));
publicationChannelListItem.setOptionalIdentifiers(identifiers);
Map<String, Object> attributes = new HashMap<>();
attributes.put(ChannelAPIModel.PROP_CHANNELLIST_BATCHID, get().getBatchId());
attributes.put(ChannelAPIModel.PROP_CHANNELLIST_STATUS, ChannelAPIModel.STATUS_FAILED);
attributes.put(ChannelAPIModel.PROP_CHANNELLIST_ERROR, error);
publicationChannelListItem.setAttributes(attributes);
Map<String, List<RemoteNodeInfo>> datalists = new HashMap<>();
datalists.put(ChannelAPIModel.TYPE_CHANNEL_LIST, Arrays.asList(publicationChannelListItem));
entity.setDatalists(datalists);
batchConfiguration.getEntityAPI().update(entity);
} catch (RemoteAPIException e) {
get().addError(id + " - " + error);
logger.error("sendError error: {} - {} - {}" ,id, e.getMessage(), error);
}
}
get().addFailedEntity(id, error);
}
}