Skip to content

Commit

Permalink
* feat: Listens for transcriber backend enabled flag.
Browse files Browse the repository at this point in the history
* squash: Moves disco info discovery to JvbConference.

* feat: Listens for transcriber backend enabled flag.

When backend enabled, even if no one is listening for transcriptions we keep going.

* squash: Moves a log.

* squash: Simplifies the code.

* squash: Extracts a helper method for cleaner code.
  • Loading branch information
damencho authored Feb 16, 2024
1 parent 0a047ff commit d83c921
Show file tree
Hide file tree
Showing 4 changed files with 174 additions and 74 deletions.
38 changes: 2 additions & 36 deletions src/main/java/org/jitsi/jigasi/AudioModeration.java
Original file line number Diff line number Diff line change
Expand Up @@ -468,43 +468,9 @@ public void mute()
/**
* The xmpp provider for JvbConference has registered after connecting.
*/
public void xmppProviderRegistered()
void setAvModerationAddress(String address)
{
// we are here in the RegisterThread, and it is safe to query and wait
// Uses disco info to discover the AV moderation address.
// we need to query the domain part extracted from room jid
if (this.callContext.getRoomJidDomain() != null)
{
try
{
long startQuery = System.currentTimeMillis();

// in case when running unittests
if (this.jvbConference.getConnection() == null)
{
return;
}

DiscoverInfo info = ServiceDiscoveryManager.getInstanceFor(this.jvbConference.getConnection())
.discoverInfo(JidCreate.domainBareFrom(this.callContext.getRoomJidDomain()));

DiscoverInfo.Identity avIdentity =
info.getIdentities().stream().
filter(di -> di.getCategory().equals("component") && di.getType().equals("av_moderation"))
.findFirst().orElse(null);

if (avIdentity != null)
{
this.avModerationAddress = avIdentity.getName();
logger.info(String.format("%s Discovered %s for %oms.",
this.callContext, this.avModerationAddress, System.currentTimeMillis() - startQuery));
}
}
catch(Exception e)
{
logger.error("Error querying for av moderation address", e);
}
}
this.avModerationAddress = address;

if (this.avModerationAddress != null)
{
Expand Down
157 changes: 133 additions & 24 deletions src/main/java/org/jitsi/jigasi/JvbConference.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.jivesoftware.smackx.xdata.packet.*;
import org.jivesoftware.smackx.xdata.*;
import org.json.simple.*;
import org.json.simple.parser.*;
import org.jxmpp.jid.*;
import org.jxmpp.jid.impl.*;
import org.jxmpp.jid.parts.*;
Expand Down Expand Up @@ -364,7 +365,12 @@ private static ExtensionElement addSupportedFeatures(
/**
* Listens for room configuration changes and request room config to reflect it locally.
*/
private RoomConfigurationChangeListener roomConfigurationListener = null;
private final RoomConfigurationChangeListener roomConfigurationListener = new RoomConfigurationChangeListener();

/**
* Listens for messages from room metadata component for changes in room metadata.
*/
private final RoomMetadataListener roomMetadataListener = new RoomMetadataListener();

/**
* Up-to-date list of participants in the room that are jigasi.
Expand Down Expand Up @@ -660,14 +666,9 @@ public void registrationStateChanged(RegistrationStateChangeEvent evt)

private synchronized void registrationStateChangedInternal(RegistrationStateChangeEvent evt)
{
if (started
&& mucRoom == null
&& evt.getNewState() == RegistrationState.REGISTERED)
if (started && mucRoom == null && evt.getNewState() == RegistrationState.REGISTERED)
{
if (this.getAudioModeration() != null)
{
this.getAudioModeration().xmppProviderRegistered();
}
discoverComponentAddresses();

// Join the MUC
joinConferenceRoom();
Expand Down Expand Up @@ -729,6 +730,62 @@ else if (evt.getNewState() == RegistrationState.CONNECTION_FAILED)
}
}

/**
* Disco info the addresses, the query is cached and will be returned from cache
* once we retrieve it.
*/
private void discoverComponentAddresses()
{
// we are here in the RegisterThread, and it is safe to query and wait
// Uses disco info to discover the AV moderation address.
// we need to query the domain part extracted from room jid
if (this.callContext.getRoomJidDomain() != null)
{
try
{
long startQuery = System.currentTimeMillis();

// in case when running unittests
if (this.getConnection() == null)
{
return;
}

DiscoverInfo info = ServiceDiscoveryManager.getInstanceFor(this.getConnection())
.discoverInfo(JidCreate.domainBareFrom(this.callContext.getRoomJidDomain()));

logger.info(String.format("%s Disco-info took %oms.", this.callContext, System.currentTimeMillis() - startQuery));

DiscoverInfo.Identity avIdentity = info.getIdentities().stream().
filter(di -> di.getCategory().equals("component") && di.getType().equals("av_moderation"))
.findFirst().orElse(null);

if (avIdentity != null && this.getAudioModeration() != null)
{
String avModerationAddress = avIdentity.getName();
this.getAudioModeration().setAvModerationAddress(avModerationAddress);
}

DiscoverInfo.Identity roomMetadataIdentity = info.getIdentities().stream().
filter(di -> di.getCategory().equals("component") && di.getType().equals("room_metadata"))
.findFirst().orElse(null);

// we process room metadata messages only when we are transcribing
if (roomMetadataIdentity != null && this.gatewaySession instanceof TranscriptionGatewaySession)
{
getConnection().addAsyncStanzaListener(roomMetadataListener,
new AndFilter(
MessageTypeFilter.NORMAL,
FromMatchesFilter.create(JidCreate.domainBareFrom(roomMetadataIdentity.getName()))));
}
}
catch(Exception e)
{
logger.error("Error querying for av moderation address", e);
}
}
}

/**
* Returns <tt>true</tt> if we are currently in JVB conference room.
* @return <tt>true</tt> if we are currently in JVB conference room.
Expand Down Expand Up @@ -885,21 +942,19 @@ public void joinConferenceRoom()

gatewaySession.notifyJvbRoomJoined();

if (lobbyEnabled)
{
// let's check room config
updateFromRoomConfiguration();
}

// let's listen for any future changes in room configuration, whether lobby will be enabled/disabled
if (roomConfigurationListener == null && mucRoom instanceof ChatRoomJabberImpl)
if (mucRoom instanceof ChatRoomJabberImpl)
{
roomConfigurationListener = new RoomConfigurationChangeListener();
getConnection().addAsyncStanzaListener(roomConfigurationListener,
new AndFilter(
FromMatchesFilter.create(((ChatRoomJabberImpl)this.mucRoom).getIdentifierAsJid()),
MessageTypeFilter.GROUPCHAT));
}

// let's check room config
updateFromRoomConfiguration();

logger.info(this.callContext + " Joined room: " + roomName + " meetingId:" + this.getMeetingId());
}
catch (Exception e)
{
Expand Down Expand Up @@ -1074,15 +1129,11 @@ private void leaveConferenceRoom()
= xmppProvider.getOperationSet(OperationSetMultiUserChat.class);
muc.removePresenceListener(this);

if (this.roomConfigurationListener != null)
XMPPConnection connection = getConnection();
if (connection != null)
{
XMPPConnection connection = getConnection();
if (connection != null)
{
connection.removeAsyncStanzaListener(roomConfigurationListener);
}

this.roomConfigurationListener = null;
connection.removeAsyncStanzaListener(roomConfigurationListener);
connection.removeAsyncStanzaListener(roomMetadataListener);
}

// remove listener needs to be after leave,
Expand Down Expand Up @@ -1952,13 +2003,51 @@ private void updateFromRoomConfiguration()
boolean singleModeratorEnabled = df.getField(Lobby.DATA_FORM_SINGLE_MODERATOR_FIELD) != null;
setLobbyEnabled(lobbyEnabled);
this.singleModeratorEnabled = singleModeratorEnabled;

List<String> roomMetadataValues
= df.getField(TranscriptionGatewaySession.DATA_FORM_ROOM_METADATA_FIELD).getValuesAsString();
if (roomMetadataValues != null && !roomMetadataValues.isEmpty())
{
// it is supposed to have a single value
processRoomMetadataJson(roomMetadataValues.get(0));
}
}
catch(Exception e)
{
logger.error(this.callContext + " Error checking room configuration", e);
}
}

private void processRoomMetadataJson(String json)
{
if (!(this.gatewaySession instanceof TranscriptionGatewaySession))
{
return;
}

try
{
Object o = new JSONParser().parse(json);

if (o instanceof JSONObject)
{
JSONObject data = (JSONObject) o;

if (data.get("type").equals("room_metadata"))
{
JSONObject metadataObj = (JSONObject)data.getOrDefault("metadata", new JSONObject());
JSONObject recordingObj = (JSONObject)metadataObj.getOrDefault("recording", new JSONObject());
((TranscriptionGatewaySession)this.gatewaySession).setBackendTranscribingEnabled(
(boolean)recordingObj.getOrDefault("isTranscribingEnabled", false));
}
}
}
catch(Exception e)
{
logger.error(callContext + " Error parsing", e);
}
}

/**
* Threads handles the timeout for stopping the conference.
* For waiting for conference call invite sent by the focus or for waiting
Expand Down Expand Up @@ -2135,6 +2224,26 @@ public void processStanza(Stanza stanza)
}
}

/**
* When a room metadata change is received.
*/
private class RoomMetadataListener
implements StanzaListener
{
@Override
public void processStanza(Stanza stanza)
{
JsonMessageExtension jsonMsg = stanza.getExtension(JsonMessageExtension.class);

if (jsonMsg == null)
{
return;
}

processRoomMetadataJson(jsonMsg.getJson());
}
}

/**
* Used to check the jvb side of the call for any activity.
*/
Expand Down
48 changes: 38 additions & 10 deletions src/main/java/org/jitsi/jigasi/TranscriptionGatewaySession.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ public class TranscriptionGatewaySession
private final static Logger logger
= Logger.getLogger(TranscriptionGatewaySession.class);

/**
* The data form field added when transcription is enabled.
*/
public static final String DATA_FORM_ROOM_METADATA_FIELD = "muc#roominfo_jitsimetadata";

/**
* The display name which should be displayed when Jigasi joins the
* room
Expand Down Expand Up @@ -101,6 +106,13 @@ public class TranscriptionGatewaySession
private List<TranscriptPublisher.Promise> finalTranscriptPromises
= new LinkedList<>();

/**
* When a backend transcribing is enabled, overrides participants request for transcriptions and keep the
* transcriber in the room and working even though no participant is requesting it.
* This is used to make transcriptions available for post-processing.
*/
private boolean isBackendTranscribingEnabled = false;

/**
* Create a TranscriptionGatewaySession which can handle the transcription
* of a JVB conference
Expand Down Expand Up @@ -308,8 +320,17 @@ void notifyChatRoomMemberUpdated(ChatRoomMember chatMember, Presence presence)
String identifier = getParticipantIdentifier(chatMember);
this.transcriber.updateParticipant(identifier, chatMember);

if (transcriber.isTranscribing() &&
!transcriber.isAnyParticipantRequestingTranscription())
this.maybeStopTranscription();
}

private boolean isTranscriptionRequested()
{
return transcriber.isAnyParticipantRequestingTranscription() || isBackendTranscribingEnabled;
}

private void maybeStopTranscription()
{
if (transcriber.isTranscribing() && !isTranscriptionRequested())
{
new Thread(() ->
{
Expand All @@ -322,7 +343,7 @@ void notifyChatRoomMemberUpdated(ChatRoomMember chatMember, Presence presence)
logger.error(e);
}

if (!transcriber.isAnyParticipantRequestingTranscription())
if (!isTranscriptionRequested())
{
jvbConference.stop();
}
Expand Down Expand Up @@ -666,13 +687,10 @@ public void notify(Transcriber transcriber, TranscriptEvent event)
{
// in will_end we will be still transcribing but we need
// to explicitly send off
TranscriptionStatusExtension.Status status
= event.getEvent() ==
Transcript.TranscriptEventType.WILL_END ?
TranscriptionStatusExtension.Status.OFF
: transcriber.isTranscribing() ?
TranscriptionStatusExtension.Status.ON
: TranscriptionStatusExtension.Status.OFF;
TranscriptionStatusExtension.Status status = event.getEvent() == Transcript.TranscriptEventType.WILL_END
? TranscriptionStatusExtension.Status.OFF
: transcriber.isTranscribing()
? TranscriptionStatusExtension.Status.ON : TranscriptionStatusExtension.Status.OFF;

TranscriptionStatusExtension extension
= new TranscriptionStatusExtension();
Expand All @@ -689,4 +707,14 @@ public boolean hasCallResumeSupport()
{
return false;
}

/**
* Sets whether backend transcriptions are enabled or not.
*/
public void setBackendTranscribingEnabled(boolean backendTranscribingEnabled)
{
this.isBackendTranscribingEnabled = backendTranscribingEnabled;

this.maybeStopTranscription();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -798,10 +798,7 @@ private Participant getParticipant(String identifier)
*/
public boolean isAnyParticipantRequestingTranscription()
{

return getParticipants()
.stream()
.anyMatch(Participant::isRequestingTranscription);
return getParticipants().stream().anyMatch(Participant::isRequestingTranscription);
}

/**
Expand Down

0 comments on commit d83c921

Please sign in to comment.