<CF_HOME>/cfusion/bin に移動します。
概要
Microsoft Azure Service Bus は、管理されたエンタープライズ統合メッセージブローカーです。Service Bus は、アプリケーションとサービスを切り離すことができます。Service Bus は、非同期データおよび状態転送用のプラットフォームを提供します。
データは、メッセージを使用して様々なアプリケーションおよびサービス間で転送されます。メッセージは、バイナリ形式で、JSON、XML またはテキストを含めることができます。
データは、メッセージを使用して様々なアプリケーションおよびサービス間で転送されます。メッセージは、バイナリ形式で、JSON、XML またはテキストを含めることができます。
Service Bus を使用すると、データを転送したり、拡張性を高めたり、ワークフローを実装したり、トピックやサブスクリプションを管理したり、その他様々なことができます。
詳しくは、Service Bus の概要を参照してください。
名前空間
名前空間は、すべてのメッセージングコンポーネントのコンテナです。単一の名前空間に複数のキューおよびトピックを含めることができます。名前空間は、多くの場合、様々なアプリケーションおよびサービスのコンテナとして提供されます。
Message
メッセージは、キューに対して送受信されます。キューは、受信するアプリケーションが受信および処理できるようになるまで、メッセージを保存します。
トピック
トピックを使用して、メッセージを送受信することもできます。キューは、多くの場合、ポイントツーポイント通信に使用されますが、トピックは、パブリッシュ/サブスクライブのシナリオで便利です。
はじめに
azureservicebus パッケージのインストール
Zip インストーラーを使用する場合に限り、Adobe ColdFusion(2021 リリース)はモジュール化されています。デフォルトでは、Azure Service Bus のパッケージはインストールされていません。最初の手順は、ColdFusion の Service Bus パッケージをインストールすることです。
注意:GUI インストーラーを使用する場合、これらのパッケージは事前にインストールされています。
Service Bus のパッケージは、azureservicebus と呼ばれます。
azureservicebus パッケージをインストールするには、ColdFusion Administrator のパッケージマネージャーページを使用するか、以下の手順に従います。
-
-
次のコマンドを入力します。
- Windows:cfpm.bat
- Linux:cfpm.sh
-
コマンド install azureservicebus を入力します。
Azure Service Bus パッケージがインストールされるまで待ちます。
詳しくは、ColdFusion パッケージマネージャーを参照してください。
クラウドサービス資格情報および設定の追加
ColdFusion(2021 リリース)には、様々なクラウドサービスにアクセスするためのオブジェクトを作成するためにハンドルを提供するメソッド、getCloudService() があります。
サービスハンドルのシンタックスを次に示します。
service=getCloudService(cloudCred,cloudConfig)
- cloudCred:クラウドサービスの資格情報を定義します。構造体または文字列です(資格情報エイリアスとも呼ばれます)。
- cloudConfig:クラウドサービス設定の詳細を定義します。構造体または文字列です(設定エイリアスとも呼ばれます)。
Azure 資格情報を獲得したら、次のいずれかの方法でこれらの資格情報を宣言する必要があります。それによって初めて、これらの資格情報を使用して Service Bus オブジェクトを作成でき、それからそのオブジェクトを使用して様々な Service Bus メソッドを呼び出すことができます。
ColdFusion Administrator
資格情報の設定
ColdFusion Administrator で、データとサービス/クラウド資格情報をクリックします。
エイリアスは、クラウドサービスおよびその設定の詳細の名前付き表現です。ColdFusion Administrator を使用して設定エイリアスを設定できます。
接続文字列には、アプリケーションから Azure Storage アカウントのデータにアクセスするために必要な認証情報が含まれています。
詳しくは、Azure Storage の接続文字列を構成するを参照してください。
詳細を入力したら、「資格情報を追加」をクリックします。
設定オプションの設定
ColdFusion Administrator で、「データとサービス/クラウド設定」をクリックします。
設定エイリアス、ベンダー、サービスの名前など、次の詳細を入力します。
設定オプションを追加した後、さらにいくつかのオプションを追加する必要がある場合があります。次に表示される画面で追加できます。場合によって追加する必要があるオプションを次に示します。
- 操作タイムアウト
- 再試行ポリシー
詳しくは、クラウド設定オプションを参照してください。
オブジェクトの作成
Blob 資格情報および設定オプションのエイリアスを作成したら、getCloudService API を使用してオブジェクトを作成し、次のコードを CFM に含めることができます。
sbObject= getCloudService("sbCred", "sbConf")
Application.cfc
Application.cfc で Blob 資格情報および設定オプションを指定できます。次に例を示します。
<cfscript> this.sqsCred = { "alias" : "firstAWSCred", "vendorName" : "AZURE", "region" : "us-west-2", "secretAccessKey" : "xxxxxxxxxxxxxxxxxxx", "accessKeyId" : "xxxxxxxxxxxxxxxxxx" } this.sqsConf = { "serviceName" : "SERVICE_BUS" } this.serviceBusCred = { "vendorName" : "AZURE", "connectionString" : "xxxxxxxxxx" } this.serviceBusConf = { "serviceName" : "SERVICE_BUS" } </cfscript>
オブジェクトの作成
sbObject = getCloudService(this.serviceBusCred, this.serviceBusConf)
CFM ページ上
CFM ページでは、次に示す 4 つの方法のいずれかで Service Bus 資格情報および設定オプションを指定できます。
資格情報エイリアスおよび設定エイリアス
Azure Blob 資格情報および設定オプション用のエイリアスを作成したら、次に示すように、getCloudService ハンドルでそれらのエイリアスを使用できます。
<cfscript> // ColdFusion Administrator で資格情報エイリアスと設定エイリアスを定義 sbObject=getCloudService("sbCred","sbConf") // 以下にコードを記述 ...........</cfscript>
資格情報エイリアスおよび設定オプションの構造体
<cfscript> sbCred = { "vendorName" : "AZURE", "connectionString" : "xxxxxx" } sbConf = { "serviceName" : "AZURE_SERVICEBUS", "options" : { "operationTimeout" : "10s", "retryPolicy":"default" } } </cfscript>
設定エイリアスおよび資格情報の構造体
<cfscript> // 設定エイリアスとサービス資格情報構造体を使用 // Service Bus 資格情報 sbCred= { "vendorName" : "AZURE", "connectionString" : "xxxxxx" } sb= getCloudService(sbCred, "sbConf") // 以下にコードを記述 .....................................</cfscript>
資格情報と設定オプションの両方の構造体
<cfscript> // クラウドの資格情報と設定の構造体を使用 sbCred = { "vendorName" : "AZURE", "connectionString" : "xxxxxx" } sbConf = { "serviceName" : "AZURE_SERVICEBUS", "options" : { "operationTimeout" : "10s", "retryPolicy":"default" } } sb= getCloudService(sbCred , sbConf ) // 以下にコードを記述 ...................................................................</cfscript>
Admin API
また、Admin API を使用して Service Bus 資格情報および設定オプションを追加することもできます。資格情報および設定を追加するためのメソッドは、cloud.cfc で使用できます。
次に、メソッド addCredential(struct credential) および addServiceConfig(struct config) の使用例を示します。
資格情報の追加
<cfscript> // Administrator コンポーネントのオブジェクトを作成して login メソッドを呼び出す adminObj = createObject("component","cfide.adminapi.administrator") adminObj.login("admin") // クラウドコンポーネントのオブジェクトを作成 cloudObj = createObject("component","cfide.adminapi.cloud") // 資格情報構造体を定義 credentialStruct={ "vendorName" : "AZURE", "connectionString" : "xxxxxx" } // 資格情報の credentialStruct を追加 try{ cloudObj.addCredential(credentialStruct) writeOutput("Credentials added successfully") } catch(any e){ writeDump(e) } </cfscript>
設定の追加
<cfscript> // Administrator コンポーネントのオブジェクトを作成して login メソッドを呼び出す adminObj = createObject("component","cfide.adminapi.administrator") adminObj.login("admin") // クラウドコンポーネントのオブジェクトを作成 cloudObj = createObject("component","cfide.adminapi.cloud") // 設定構造体を定義 configStruct= { "serviceName" : "AZURE_SERVICEBUS", "options" : { "operationTimeout" : "10s", "retryPolicy":"default" } } // 設定の configStruct を追加 try{ cloudObj.addServiceConfig(configStruct) writeOutput("Configuration service added successfully") } catch(any e){ writeDump(e) } </cfscript>
CFSetup
CFSetup 設定ユーティリティを使用して SQS の資格情報と設定をセットアップすることもできます。
クラウド資格情報の追加
- add cloudcredential credentialAlias=s3cred accesskeyid=<access> secretaccesskey=<secret> region=ap-southeast-1 vendorname=AWS
資格情報の設定
- set cloudcredential snscred secretaccesskey=awssecret
クラウド設定の追加
- add cloudconfiguration serviceName=S3 alias=s3Config
設定の設定
- set cloudconfiguration conf1 alias=conf2
ColdFusion Administrator
ColdFusion Administrator/設定で、Service Bus イベントの処理に使用されるスレッドプール設定を変更できます。
- コアプールサイズ:保持しておく必要があるワーカースレッドの最小数です。
- 最大プールサイズ:プールで使用可能なスレッドの最大数です。
- キープアライブ時間:アイドルスレッドのタイムアウトまでの待機時間(ミリ秒単位)です。
トピックの作成
Service Bus では、トピックに複数の独立したサブスクリプションを含めることができます。トピックのサブスクライバーは、そのトピックに送信された各メッセージのコピーを受信できます。複数のレシーバーが 1 つのサブスクリプションに割り当てられる可能性があります。トピックには 0 個以上のサブスクリプションを含めることができますが、サブスクリプションは 1 つのトピックにのみ属することができます。
例
<cfscript> sb = getcloudService(this.serviceBusCred, this.serviceBusConf) // トピックメタデータ createTopicMetadata = { "autoDeleteOnIdle" = "45M", "requiresDuplicateDetection" = "yes", //"maxSize" = 1024, "entityStatus" = "Active", "userMetadata" = "my user metadata", "enableBatchedOperations" = "true", "enablePartitioning" = "yes" } //sb.deleteTopic("secondTopic") tempTopic = sb.createTopic("Topic_005", createTopicMetadata) //sb.deleteTopic("firstTopic") writeDump(tempTopic) //writeDump(sb.listTopics()) </cfscript>
サブスクリプションの作成
アプリケーションは、キューから受信するのと同じ方法で、QueueClient または MessageSession オブジェクトを使用して、サブスクリプションからメッセージを受信します。トピックには 0 個以上のサブスクリプションを含めることができますが、サブスクリプションは 1 つのトピックにのみ属することができます。詳しくは、Azure SB サブスクリプションを参照してください。
例
<cfscript> sb = getcloudService(this.serviceBusCred, this.serviceBusConf) subscriptionMetadata={ "subscriptionName"="sub_1", "topicPath"="Topic_005", "lockDuration" = "77s", "maxDeliveryCount" = 12, "enableDeadLetteringOnMessageExpiration"="true", "enableDeadLetteringOnFilterEvaluationException"="true" } mysub=sb.subscribe(subscriptionMetadata) writeDump(mysub) </cfscript>
キューへのメッセージの送信
キューを作成したら、sendMessage 関数を使用して、メッセージをキューに送信できます。詳しくは、公式ドキュメントを参照してください。
例
<cfscript> sb = getcloudService(this.serviceBusCred, this.serviceBusConf) sb.deleteQueue("Queue18") // メッセージを送信 sendMessageMetadata = { "messageBody" = "my message is here", "messageProperties" = { "myatt1" = "att1,att2" } } tempQ = sb.createQueue("Queue18") //writedump(tempQ) mypath=tempQ.getPath() sendMessageResponse = tempQ.sendMessage(sendMessageMetadata) writeDump(sendMessageResponse) </cfscript>
メンバー関数
<cfscript> sb = getcloudService(this.serviceBusCred, this.serviceBusConf); myQ = sb.listQueues().get("queueList")[1] sendMessage() public function sendMessage() { writeoutput("send message response") closure = function (){ writeoutput("my function my choice") } sendMessageMetadata = { "messageBody" = "my message my choice", "messageProperties" = { "myatt1" = "att1,att2" } } sendMessageResponse = myQ.sendMessage(sendMessageMetadata) writeDump(sendMessageResponse) } </cfscript>
キューのリスト表示
すべてのキューをリスト表示するには、listQueues 関数を使用します。
例
<cfscript> sb = getcloudService(this.serviceBusCred, this.serviceBusConf) // すべてのキューをリストアップ allQueues=sb.listQueues() writeDump(allQueues) </cfscript>
メンバー関数
<cfscript> sb = getcloudService(this.serviceBusCred, this.serviceBusConf); myQ = sb.listQueues().get("queueList")[1] writeDump(myQ.getDescription()) </cfscript>
キューの削除
キューを削除するには、deleteQueue 関数を使用します。この関数は、キューの名前をパラメーターとして取ります。
例
<cfscript> sb = getcloudService(this.serviceBusCred, this.serviceBusConf) // すべてのキューをリストアップ try{ sb.deleteQueue("Queue8") writeOutput("Queue deleted successfully") } catch (any e){ writeOutput("Unable to delete queue") } </cfscript>
キューの更新
キューを更新するには、updateQueue 関数を使用します。この関数は、次のパラメーターを受け入れます。
- キューの名前
- メタデータ構造体
メタデータ構造体は、次の値で構成されます。
- "autoDeleteOnIdle" = "time",
- "enableDeadLetteringOnMessageExpiration" = false|true,
- "entityStatus" = "Disabled|Enabled",
- "userMetadata" = "some metadata",
- "enableBatchedOperations" = "no|yes"
例
<cfscript> sb = getcloudService(this.serviceBusCred, this.serviceBusConf) // キュー属性を定義 lockDuration = 51 qName = "Q_001" queueAttributes = { "autoDeleteOnIdle" = "240h", "enableDeadLetteringOnMessageExpiration" = true, "maxDeliveryCount" = 101, "requiresDuplicateDetection" = "YES", // 更新で変更できない"lockDuration" = "#lockDuration#s", //"maxSizeInMB" = 2048, // デフォルト値を確認 "entityStatus" = "Active", "userMetadata" = "some user metadata : data val", "enableBatchedOperations" = true, "enablePartitioning" = false // 更新で変更できない} queueAttributesUpdated = { "autoDeleteOnIdle" = "10M", "enableDeadLetteringOnMessageExpiration" = false, "entityStatus" = "Disabled", "userMetadata" = "some new user metadata", "enableBatchedOperations" = "no" } // キューを作成 sb.createQueue(qName, queueAttributes) try{ sb.updateQueue(qName, queueAttributesUpdated) writeOutput("Queue updated successfully.") } catch (any e){ writeOutput("Could not update queue.") } </cfscript>
プリフェッチ
公式 Service Bus クライアントのいずれかでプリフェッチが有効な場合、レシーバーは、アプリケーションが最初に要求した数を超えて、最大で PrefetchCount 上限まで、メッセージを自動的に獲得します。詳しくは、Service Bus のプリフェッチを参照してください。
setPrefetchCount
例
<cfscript> sb = getcloudService(this.serviceBusCred, this.serviceBusConf) // キューを作成 myQueue=sb.createQueue("Q_004") // プリフェッチ回数を設定 try{ setCount=sb.setPrefectCount(myQueue,50) writeOutput("Prefetch count set successfully.") } catch (any e){ writeOutput("Unable to set prefetch count.") } </cfscript>
getPrefetchCount
例
<cfscript> sb = getcloudService(this.serviceBusCred, this.serviceBusConf) // プリフェッチ回数を取得 getCount=sb.getPrefetchCount("Q_004") writeDump(getCount) </cfscript>
キューへのメッセージの送信
sendMessageBatch 関数を使用して、メッセージのバッチをキューに送信します。この関数は、次のパラメーターを受け入れます。
- キューの名前
- キューに送信されるメッセージ本文
例
<cfscript> sb = getcloudService(this.serviceBusCred, this.serviceBusConf) msgLockDurationInQ = 3 pauseTimeInSecs = (msgLockDurationInQ+3) * 1000 msgCount = 5 qName = "Q_batchOps" queueAttributes = { "maxDeliveryCount" = 100, "lockDuration" = "#msgLockDurationInQ#s" } // キューを作成 myQueue=sb.createQueue(qName,queueAttributes) // メッセージを一括送信 msgBatch = arrayNew(1) for(n=1; n <= msgCount; n++){ msgInstance = { "messageId" = "#n#", "messageBody" = "msg time stamp - #now()#" } msgBatch[n] = msgInstance } messageBatchResponse=sb.sendMessageBatch(qName, msgBatch) writeDump(messageBatchResponse) </cfscript>
メッセージのスケジュール
例えば、特定の時簡にシステムによって処理が可能になるようにジョブをスケジュールするために、遅延処理用にキューまたはトピックにメッセージを送信できます。詳しくは、公式ドキュメントを参照してください。
例
<cfscript> sb = getcloudService(this.serviceBusCred, this.serviceBusConf) // キューを作成 myQueue=sb.createQueue("Q_006") schedMessage={ "messageId" = "001", "messageBody" = "message body", "ScheduledEnqueueTimeUtc"="02/02/2020" } scheduleResponse=sb.scheduleMessageToQueue(myQueue, schedMessage) writeDump(scheduleResponse) </cfscript>
メッセージの参照
メッセージ参照(またはピーク)を使用すると、Service Bus クライアントで、キューまたはサブスクリプションに存在するすべてのメッセージを(通常は診断およびデバッグ目的で)列挙できます。詳しくは、公式ドキュメントを参照してください。
例
<cfscript> sb = getcloudService(this.serviceBusCred, this.serviceBusConf) msgCount = 3 msgLockDuration = 3 qName = "Q_Peek_Ex" queueAttributes = { "maxDeliveryCount" = msgCount, "lockDuration" = "#msgLockDuration#s" } queue = sb.createQueue(qName, queueAttributes) peekProps = { "fromSequenceNumber" = 1, "messageCount" = msgCount - 1 } writeOutput("peeking q msgs...<br>") writeDump(queue.peekMessage(peekProps)) </cfscript>
トピックの更新
トピックを作成したら、updateTopic 関数を使用して、トピックを更新できます。この関数は、次のパラメーターを受け入れます。
- トピック
- トピックメタデータ
例
<cfscript> sb = getcloudService(this.serviceBusCred, this.serviceBusConf) // トピック属性を定義 topicName = "topic_001"; autoDeleteOnIdle = 45 topicAttributes = { "autoDeleteOnIdle" = "#autoDeleteOnIdle#M", //"requiresDuplicateDetection" = "yes", // 変更できない。デフォルト:false "maxSizeInMB" = 1024, "entityStatus" = "Active", "userMetadata" = "modified metadata", "enableBatchedOperations" = true, "enablePartitioning" = "no" } // トピックを作成 topic = sb.createTopic(topicName) // デフォルト属性を使用して作成 writeOutput("created topic: " & topic.getPath() & "<br>") // トピックを更新 try{ sb.updateTopic(topicName, topicAttributes) writeOutput("Topic " & topicName & " updated successfully") } catch (any e){ writeOutput("Unable to update topic") } </cfscript>
トピックの削除
作成したトピックを削除します。deleteTopic 関数を使用します。この関数は、トピックパスを引数として受け取ります。
例
<cfscript> sb = getcloudService(this.serviceBusCred, this.serviceBusConf) topicName="topic_001" // 作成済みのトピック topic_001 を削除 try{ sb.deleteTopic(topicName) writeOutput("Topic " & topicName & " deleted successfully") } catch(any e){ writeOutput("Unable to delete the topic") } </cfscript>
トピックのリスト表示
アカウント内のすべてのトピックをリスト表示します。listTopics 関数を使用します。
例
<cfscript> sb = getcloudService(this.serviceBusCred, this.serviceBusConf) // 2 つのトピックを作成 sb.createTopic("T1") // トピック 1 sb.createTopic("T2") // トピック 2 // すべてのトピックをリストアップ listResponse=sb.listTopics() writeDump(listResponse) </cfscript>
トピックからのサブスクリプション解除
unsubscribe メソッドを使用して、トピックをサブスクリプション解除できます。
例
<cfscript> sb = getcloudService(this.serviceBusCred, this.serviceBusConf) topicName = "topic-for-sub"; subscriptionName = "subsciption-001" subscriptionAttribs = { "subscriptionName" = subscriptionName, "autoDeleteOnIdle" = "5M", "maxDeliveryCount" = 10, "lockDuration" = "10s", "defaultMessageTimeToLive" = "10M" } // トピックを作成 try{ topic=sb.createTopic(topicName) // デフォルト属性を使用して作成 writeOutput("Topic created successfully") } catch(any e){ writeOutput("Unable to create topic") } // サブスクリプションを作成 try{ subscription = topic.subscribe(topicName, subscriptionAttribs) // デフォルト属性を使用して作成 writeOutput("Subscribed to topic successfully") } catch(any e){ writeOutput("Unable to subscribe to topic") } // トピックのサブスクリプションを解除 //unsubscribe(String topicPath, String subscriptionName) try{ sb.unsubscribe(topic,subscription) writeOutput("Unsubscribed successfully") } catch(any e){ writeOutput("Unable to unsubscribe") } </cfscript>
サブスクリプション名の取得
以前作成したサブスクリプションの名前を取得するには、getSubscription 関数を使用します。この関数は、トピックおよびサブスクリプションの名前を受け入れます。
例
<cfscript> sb = getcloudService(this.serviceBusCred, this.serviceBusConf) topicName = "testTopic" // トピックを作成 topic=sb.createTopic(topicName) subscriptionName = "sub-00" // サブスクリプションを取得 subscription = topic.getSubscription(subscriptionName) writeDump(subsciption) </cfscript>
メッセージハンドラーの登録
<cfscript> sb = getcloudService(this.serviceBusCred, this.serviceBusConf) // void registerMessageHandler(String topicPath, String subscriptionName, Map metadata) // トピックを作成 // トピックメタデータ createTopicMetadata = { "autoDeleteOnIdle" = "45M", "requiresDuplicateDetection" = "yes", //"maxSize" = 1024, "entityStatus" = "Active", "userMetadata" = "my user metadata", "enableBatchedOperations" = "true", "enablePartitioning" = "yes" } topic = sb.createTopic("Topic_013", createTopicMetadata) // サブスクリプションを作成 subscriptionName = "Sub_013" subscriptionAttribs = { "subscriptionName" = subscriptionName, "autoDeleteOnIdle" = "5M", "maxDeliveryCount" = 10, "lockDuration" = "10s", "defaultMessageTimeToLive" = "10M" } subscription = topic.subscribe(subscriptionAttribs) messageHandlerOptions={ "autocomplete"="yes", "maxautorenewduration"="50M", "maxconsurrentcalls"=30 } renewMessageResponse=sb.registerMessageHandler(topic,subscription,messageHandlerOptions) writeDump(renewMessageResponse) </cfscript>
メッセージの破棄
サブスクリプションメッセージの破棄
サブスクリプションに対するすべてのメッセージを破棄します。
シンタックス
abandonSubscriptionMessage(String formattedSubscriptionPath, Map metadata)
<cfscript> sb = getcloudService(this.serviceBusCred, this.serviceBusConf) topicName = "topic_003"; topic = topicUtil.createTopic(sb, topicName, false, {}) //purgeAndRecreate, attribs originalLabel = "Red" adandonCountLabel = "adandonCount" subscriptionName = "sub_003" subscription = topic.subscribe({ "subscriptionName" = subscriptionName }) updatedMessageProperties = { "DeadLetterReason" = "reason for this", "DeadLetterErrorDescription" = "collateral" } abandonResponse = sb.abandonSubscriptionMessage(subscription.getTopicName(), subscription.getSubscriptionName(), updatedMessageProperties) writeDump(abandonResponse) </cfscript>
メッセージの破棄
ロックトークンを含むメッセージを破棄します。これにより、再度メッセージを処理できるようになります。メッセージの破棄により、メッセージの配信数が増加します。
シンタックス
abandonMessage(String queuePath, Map metadata)
<cfscript> sb = getcloudService(this.serviceBusCred, this.serviceBusConf) qName = "cfqa-abandon-msg" // キューを作成 queue = sb.createQueue(qName) oldLabel = "red" newLabel = "blue" sentMessage = sendMessage(queue, oldLabel) writeOutput("peeking at the queue: #qName#, before abandoning message.<br>") receivedMessage = sb.receiveMessagesFromQueue(queue, "PEEKLOCK", 1) //READ_MODE_RECEIVEANDDELETE for(message in receivedMessage){ //cfdump(var=#message#, label="message received.") abandonMessage(qName, message.lockToken, newLabel) } </cfscript>
メッセージの延期
メッセージの延期
キューまたはサブスクリプションクライアントが、処理対象だがアプリケーション内部の特殊事情により現在は処理できないメッセージを受信する場合、メッセージの取得を後に「延期する」オプションがあります。そのメッセージは、キューまたはサブスクリプションに残りますが、保留されます。
シンタックス
deferMessage(String queuePath, Map metadata)
<cfscript> sb = getcloudService(this.serviceBusCred, this.serviceBusConf) qName="queue_002" // キューを作成 queue = sb.createQueue(qName, {"lockDuration": "2s"}) // キューにメッセージを送信 msgBatch = arrayNew(1) for(n=1; n <= msgCount; n++){ msgInstance = { "messageId" = "#n#", "messageBody" = "msg time stamp - #now()#" } msgBatch[n] = msgInstance } sb.sendMessageBatch(qName, msgBatch) receivedMessage = sb.receiveMessagesFromQueue(queue, "PEEKLOCK", 1) //READ_MODE_RECEIVEANDDELETE // メッセージを延期 for(message in receivedMessage){ propertiesToModify = {"deferred_time:" : dateTimeFormat(now())} sb.deferMessage(qName, { "lockToken" = message.lockToken , "propertiesToModify" = propertiesToModify }) } </cfscript>
延期されたメッセージの受信
延期されたメッセージは、(サブキューに置かれる配信不能メッセージとは異なり)他のすべてのアクティブメッセージと共にメインキューに残りますが、通常の Receive/ReceiveAsync 関数を使用して受信することはできなくなります。延期されたメッセージは、アプリケーションが見失った場合、メッセージ参照を使用して見つけることができます。
シンタックス
receiveDeferredMessage(String queuePath, Map metadata)
<cfscript> sb = getcloudService(this.serviceBusCred, this.serviceBusConf) qName="queue_002" // キューを作成 queue = sb.createQueue(qName, {"lockDuration": "2s"}) // キューにメッセージを送信 msgBatch = arrayNew(1) for(n=1; n <= msgCount; n++){ msgInstance = { "messageId" = "#n#", "messageBody" = "msg time stamp - #now()#" } msgBatch[n] = msgInstance } sb.sendMessageBatch(qName, msgBatch) receivedMessage = sb.receiveMessagesFromQueue(queue, "PEEKLOCK", 1) //READ_MODE_RECEIVEANDDELETE // メッセージを延期 for(message in receivedMessage){ propertiesToModify = {"deferred_time:" : dateTimeFormat(now())} sb.deferMessage(qName, { "lockToken" = message.lockToken , "propertiesToModify" = propertiesToModify }) } // 延期されたメッセージを受信 receivedDeferredMessage = sb.receiveDeferredMessage(qName, { "sequenceNumber" : seqNo } ) writeDump(receivedDeferredMessage) </cfscript>
配信不能処理
Azure Service Bus キューおよびトピックサブスクリプションには、配信不能キューと呼ばれる、セカンダリサブキューが用意されています。配信不能キューは、明示的に作成する必要がなく、削除できません。配信不能キューの目的は、どのレシーバーにも配信できないメッセージや処理できないメッセージを保持することです。
配信不能メッセージ
シンタックス
deadLetterMessage(String queuePath, Map metadata)
配信不能サブスクリプションメッセージ
シンタックス
deadLetterSubscriptionMessage(String formattedSubscriptionPath, Map metadata)
ルール
フィルターまたはサブスクリプションルールは、そのトピックに送信されたメッセージのどれがトピックサブスクリプションによって受信される必要があるかを指定します。いつトピックサブスクリプションが作成されても、デフォルトのトピックサブスクリプションルールが作成され、これにより、そのトピックに送信されたすべてのメッセージをトピックサブスクリプションが受信できます。
サブスクリプションルールの追加
シンタックス
addSubscriptionRule(String topicPath, String subscriptionName, Map metadata)
<cfscript> sb = getcloudService(this.serviceBusCred, this.serviceBusConf) // トピックを作成 topicName = "topic_001" topic = sb.createTopic(topicName) subscriptionName = "subscription_001" // トピックをサブスクライブ sb.subscribe( topicName, { "subscriptionName" = subscriptionName } ) // ルールを取得 filterRuleTrue = { "type" = "true", "name" = "trueFilter" } sb.addSubscriptionRule(topicName, subscriptonName, filterRuleTrue) </cfscript>
サブスクリプションルールの削除
シンタックス
removeSubscriptionRule(String topicPath, String subscriptionName, String ruleName)
<cfscript> sb = getcloudService(this.serviceBusCred, this.serviceBusConf) // トピックを作成 topicName = "topic_001" topic = sb.createTopic(topicName) subscriptionName = "subscription_001" // トピックをサブスクライブ sb.subscribe( topicName, { "subscriptionName" = subscriptionName } ) // ルールを取得 filterRuleTrue = { "type" = "true", "name" = "trueFilter" } sb.addSubscriptionRule(topicName, subscriptonName, filterRuleTrue) removeRuleResponse=sb.removeSubscriptionRule(topicName, subscriptonName, filterRuleTrue) </cfscript>
サブスクリプションルールの取得
シンタックス
getSubscriptionRules(String formattedSubscriptionPath)
メッセージのスケジュール
キューまたはトピックへのメッセージをスケジュールできます。メッセージがスケジュールされた時間までそのメッセージがバスにキューされ、メッセージがレシーバー側に配信されます。これは、パフォーマンスを向上させるために、現在は非同期処理です。
キューへのメッセージのスケジュール
シンタックス
scheduleMessageToQueue(String queuePath, Map metadata)
<cfscript> sb = getcloudService(this.serviceBusCred, this.serviceBusConf) // キューを作成 myQueue=sb.createQueue("Q_006") schedMessage={ "messageId" = "001", "messageBody" = "message body", "ScheduledEnqueueTimeUtc"="02/02/2020" } scheduleResponse=sb.scheduleMessageToQueue(myQueue, schedMessage) writeDump(scheduleResponse) </cfscript>
トピックへのメッセージのスケジュール
シンタックス
scheduleMessageToTopic(String topicPath, Map metadata)
スケジュールされたトピックへのメッセージのキャンセル
シンタックス
cancelScheduledTopicMessage(String topicPath, long sequenceNumber)
スケジュールされたキューへのメッセージのキャンセル
シンタックス
cancelScheduledQueueMessage(String queuePath, long sequenceNumber)
<cfscript> sb = getcloudService(this.serviceBusCred, this.serviceBusConf) qName = "cancel-scheduled-1msg2" delayInSecs = 30 msgCount = 1 // キューを作成 queue = sb.createQueue(qName) if(!listFindNoCase(qUtil.getQueuesListStr(sb), qName)) writeOutput("queue #qName# NOT created.This is NOT expected.<br>") else writeOutput("queue #qName# created successfully.<br>") messageArrayToSend = getMessageArray(msgCount) seqNo = scheduler.scheduleMessage(sb, qName, messageArrayToSend[1], delayInSecs) writeOutput("cancelling message with sequence no: #seqNo#<br>") cancelMessageResponse = sb.cancelScheduledMessage(qName, seqNo) if(cancelMessageResponse[statusCode] NEQ 200) writeOutput("error cancelling msg for #qName#, seqno.#seqNo#<br>") </cfscript>
メッセージ参照
peek メソッドは、キューまたはサブスクリプションログに存在するすべてのメッセージを、最下位から最上位の順番で列挙します。
キューからのメッセージのピーク
シンタックス
peekMessageFromQueue(String queuePath, Struct metadata)
<cfscript> sb = getcloudService(this.serviceBusCred, this.serviceBusConf) // キューを作成 qName="queue-005" queue = sb.createQueue(qName) peekProperties = { "fromSequenceNumber" = 1, "messageCount" = 10 } peekResponse = sb.peekMessageFromQueue(queue.getPath(), peekProperties) receivedMsgArray = peekResponse.messages writeDump(receivedMsgArray) </cfscript>
サブスクリプションからのメッセージのピーク
シンタックス
peekMessageFromSubscription(String topicPath, String subscriptionName, Struct metadata)
<cfscript> sb = getcloudService(this.serviceBusCred, this.serviceBusConf) qName = "q-msg-Peek-x4" msgCount = 3 msgLockDuration = 2 // キューを作成 queue=sb.createQueue(qName,{ "lockDuration"=msgLockDuration, "maxDeliveryCount"=msgCount }) // ピークプロパティを設定 peekProps = { "fromSequenceNumber" = 1, "messageCount" = msgCount } peekResponse = sb.peekMessage(queue.getPath(), peekProps) // サイズが msgCount - 1 の配列 if(peekResponse[statusCode] == 200){ receivedMessages = peekResponse.messages writeOutput("received message deserialized:<br>") } else writeOutput("message browsing failed") </cfscript>