Beantwoord

Microsoft Azure Stream Analytics export naar Blob Storage/ADLS Gen2 geeft fout in dataverzameling

  • 29 November 2022
  • 5 reacties
  • 51 keer bekeken

Goedemorgen!

Momenteel heb ik de volgende IOT setup:

Iphone met KPN iot test app → KPN iot platform → Azure IOT Hub → Azure Stream Analytics Job → Azure Blob Storage/ADLS Gen2

Wanneer ik metrische data verstuur via de app, zie ik die binnenkomen via het Device Twin overzicht in het KPN IOT platform. Daarna wordt de data overgestuurd naar de Azure IOT Hub. Daar zie ik dat de iPhone is geregistreerd als apparaat. In de ASA job zie ik, als ik een query "SELECT * FROM <input>” draai ook de verschillende id's binnenkomen met de bijbehorende metrische data van de iPhone. So far so good.

Maar daarna loop ik tegen een probleem aan, wanneer ik de ASA job start. Hij geeft iedere keer de melding dat de job faalt. Als ik in de JSON resultaat kijk, zie ik de volgende melding:

“JobFailedMessage": "The streaming job failed: Er zijn validatiefouten opgetreden voor de Stream Analytics-taak: De invoer yourinputalias die is gebruikt in de query is niet opgegeven."

Wanneer ik een beetje rondgoogle kom ik o.a. wat zaken tegen over non-conformiteit in de aangeleverde data. Maar veel verder kom ik niet. Is er een Azure ervaringsdeskundige die dit probleem in combinatie met KPN IOT platform heeft getackeld? Heb ik misschien een fout in de naamgeving van de input en de output bronnen in ASA? Ik loop hier nu echt op vast. Ik heb inmiddels al 10 keer de hele boel opnieuw geconfigureerd, maar nog steeds geen resultaat. Help?

icon

Beste antwoord door Alex Wesemann 1 December 2022, 13:23

Bekijk origineel

5 reacties

Reputatie 7
Badge +6

Goedemiddag @Alex Wesemann,

Welkom op het IoT Forum! 
Ik moet zeggen dat ik zelf niet de oplossing voor dit probleem heb en ik vraag me eerlijk gezegd af of wij hiermee kunnen helpen. Uiteraard laat ik het daar niet zomaar bij zitten en heb ik de hulp ingeschakeld van onze specialisten. Zodra ik een terugkoppeling ontvang kom ik direct bij je terug. Bedankt voor je geduld!  

Beste @Rick S. ,

Dank voor je reactie. Ik heb er eens een nachtje over geslapen en ik vermoed dat er een probleem optreedt bij het overhalen van de data uit de import tabel naar de uitvoertabel in de ASA (Azure Stream Analytics) job. Normaal gesproken luidt deze query:

SELECT * 

INTO <uitvoer-alias>

FROM <invoer-alias>

Hierbij is <uitvoer-alias> de in de ASA job opgegeven uitvoer en <invoer-alias> de in de ASA job opgegeven invoer.

Ik ga kijken of ik de tabelnamen op kan vragen via een aparte query en die dan gebruiken ipv de aliassen. Daarmee voorkom ik mogelijk deze foutmelding. Er is namelijk wel degelijk data opgehaald uit de IOT Hub en in de invoertabel terecht gekomen. Ik kan bijvoorbeeld de batterijstatus van de iphone zien en de versnelling op de x-as.

Een en ander wil ik later vandaag proberen en dan kom ik hier terug met de resultaten.

Wordt vervolgd.

gr, Alex

Beste @Rick S. ,

Volgens mij heb ik het probleem zelf opgelost, door gebruik te gaan maken van SQL Server in plaats van de blob uitvoer.

Gebruik maken van SQL Server had sowieso mijn voorkeur, mede vanwege mijn ervaring hier mee.

Het is belangrijk om gebruik te maken van een managed identity. Dit voorkomt allerlei ellende met rechten, machtigingen en toegang. Maak dus in Azure een managed identity aan op abonnement niveau met alle rechten. Er zijn genoeg handleidingen hiervoor te vinden, dus die zal ik niet beschrijven.

Om te beginnen heb ik een SQL Server aangemaakt en hierop de SQL Database aangemaakt. In de SQL database heb ik een tabel aangemaakt met de volgende query:

CREATE TABLE [dbo].[iothub](

[Id] [int] IDENTITY(1,1) NOT NULL,

[EventProcessedUtcTime] [datetime] NULL,

[bn] nvarchar NULL,

[bt] [float] NULL,

[EventEnqueuedUtcTime] [datetime] NULL,

[n] nvarchar NULL,

[u] nvarchar NULL,

[v] [numeric](18, 2) NULL,

[vs] nvarchar NULL,

[PartitionId] datetime2 NULL,

PRIMARY KEY CLUSTERED

(

[Id] ASC

)WITH (STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF) ON [PRIMARY]

) ON [PRIMARY]

GO

CREATE INDEX iothubbn

ON iothub (bn)

GO

CREATE INDEX iothubbt

ON iotthub (bt)

GO

(Met dank aan: https://docs.kpnthings.com/gs/azure/azure-sql?q=azure+iot+hub )

Ik maak gebruik van een IOT Hub, dus ik heb voor de duidelijkheid alle sleutelwaarden eventhub vervangen voor iothub.

Daarna heb ik de een nieuwe contained user in de database aangemaakt die dezelfde naam heeft als de ASA job:

CREATE USER [ASA_JOB_NAME] FROM EXTERNAL PROVIDER;

(vervang tussen de blokhaken de naam ASA-JOB-NAME door de naam van de ASA job naam die is aangemaakt)

Controleer daarna of de gebruiker correct is aangemaakt:

SELECT * FROM <SQL_DB_NAME>.sys.database_principals WHERE type_desc = 'EXTERNAL_USER'

Vervang <SQL_DB_NAME> voor [de databasenaam] inclusief de blokhaken

Geef vervolgens rechten aan de aangemaakte gebruiker om zowel te schrijven en op te vragen aan de ASA job.

De tabelnaam die hieronder moet worden ingevuld is gelijk aan de tabelnaam zoals in het create statement hierboven is opgegeven (in mijn geval was dit iothub)

GRANT CONNECT TO ASA_JOB_NAME;

GRANT SELECT, INSERT ON OBJECT::TABLE_NAME TO ASA_JOB_NAME;

Nu kan de output worden aangemaakt in de ASA job. Kies het abonnement en de resource. Vervolgens de database en geef aan dat er gebruik moet worden gemaakt van de managed identity voor toegang tot de database. Vul de tabelnaam in, waar moet worden weggeschreven (in dit geval dus iothub).

Daarna kan in het menu item Query<> van de ASA job de volgende testquery worden uitgevoerd om te kijken of er daadwerkelijke data vanuit de IOT hub kan worden weggeschreven in de SQL database:

SELECT

CASE

WHEN bn IS NULL THEN LAG(bn) OVER (PARTITION BY EventEnqueuedUtcTime LIMIT DURATION(second, 1)WHEN bn IS NOT NULL)

ELSE bn

END as bn,

CASE

WHEN bt IS NULL THEN LAG(bt) OVER (PARTITION BY EventEnqueuedUtcTime LIMIT DURATION(second, 1)WHEN bt IS NOT NULL)

ELSE bt

END as bt,

EventProcessedUtcTime,

EventEnqueuedUtcTime,

n,

u,

v,

vs,

PartitionId

INTO

[sqloutput]

FROM

[eventhubinput]

 

Let erop dat sqloutput vervangen moet worden door de gekozen output naam in de ASA job flow. Dit geldt ook voor de naam eventhubinput. Ik heb gewoon input en output gebruikt. Wel altijd de blokhaken eromheen zetten; anders bestaat de kans dat de query faalt.

Als er gegevens als resultaat in de tabel verschijnen, werkt de boel en kan de ASA job worden gestart.

Reputatie 7
Badge +6

Goedemiddag @Alex Wesemann,

Bedankt voor de zeer uitgebreide terugkoppeling! 
Super fijn om te lezen dat je het zelf op hebt kunnen lossen. Als er nog iets anders is of als we ergens anders bij mee moeten denken horen we het graag. 

Hele fijne dag verder!  

Beste @Rick S. 

Ik moet nog een kleine aanvulling doen op het geheel. In mijn situatie kreeg ik geen resultaten te zien in de tabel in SQL Server, omdat het in Azure er standaard wordt gewerkt met een tijd-localisatie van UTC. Dit loopt 1 uur achter op de standaard tijd bij ons. Ik heb het hele Case statement eruit gehaald en dit zou dus het juiste statement moeten zijn:

SELECT

EventProcessedUtcTime,

EventEnqueuedUtcTime,

n,

u,

v,

vs,

PartitionId

INTO

[sqloutput]

FROM

[eventhubinput]

Mochten er meer parameters worden binnengehaald, kan dit statement natuurlijk verder worden uitgebreid. De tabel in SQL Server heb ik ook nog wat verder verfijnd. Mocht er iemand met specifieke vragen zijn hierover, dan hoor ik het natuurlijk ook graag.

Reageer